Commit 16578880 authored by Christos Stavrakakis's avatar Christos Stavrakakis
Browse files

cyclades: Count quotas for volumes

Update Cyclades quotas to count the size of the Volumes in the
'cyclades.disk' resources:

* Update server creation to count disk quota, not based on the server
  flavor, but on the newly created volumes (those volume which are in
  'CREATING' status)
* Update volume attach/detach server commands to pass a list of quotable
  volume changes
* Update snf-dispatcher to check if an unexpected Ganeti job changes the
  disks of an instance. Only disks from which the Synnefo is aware (and
  so the exist in DB as Volumes) are taken into account.
* Update quota reconciliation mechanism to count 'cyclades.disk' based
  on user volumes.
parent 042f07c3
......@@ -995,6 +995,7 @@ class Volume(models.Model):
("DETACHING", "The volume is detaching from an instance"),
("IN_USE", "The volume is attached to an instance"),
("DELETING", "The volume is being deleted"),
("DELETED", "The volume has been deleted"),
("ERROR", "An error has occured with the volume"),
("ERROR_DELETING", "There was an error deleting this volume"),
("BACKING_UP", "The volume is being backed up"),
......
This diff is collapsed.
......@@ -302,7 +302,7 @@ class BackendReconciler(object):
created__lte=building_time) \
.order_by("id")
gnt_nics = gnt_server["nics"]
gnt_nics_parsed = backend_mod.process_ganeti_nics(gnt_nics)
gnt_nics_parsed = backend_mod.parse_instance_nics(gnt_nics)
nics_changed = len(db_nics) != len(gnt_nics)
for db_nic, gnt_nic in zip(db_nics, sorted(gnt_nics_parsed.items())):
gnt_nic_id, gnt_nic = gnt_nic
......@@ -321,9 +321,11 @@ class BackendReconciler(object):
self.log.info(msg, server_id, db_nics_str, gnt_nics_str)
if self.options["fix_unsynced_nics"]:
vm = get_locked_server(server_id)
backend_mod.process_net_status(vm=vm,
etime=self.event_time,
nics=gnt_nics)
backend_mod.process_op_status(
vm=vm, etime=self.event_time, jobid=-0,
opcode="OP_INSTANCE_SET_PARAMS", status='success',
logmsg="Reconciliation: simulated Ganeti event",
nics=gnt_nics)
def reconcile_unsynced_disks(self, server_id, db_server, gnt_server):
building_time = self.event_time - BUILDING_NIC_TIMEOUT
......@@ -332,7 +334,7 @@ class BackendReconciler(object):
.filter(deleted=False)\
.order_by("id")
gnt_disks = gnt_server["disks"]
gnt_disks_parsed = backend_mod.process_ganeti_disks(gnt_disks)
gnt_disks_parsed = backend_mod.parse_instance_disks(gnt_disks)
disks_changed = len(db_disks) != len(gnt_disks)
for db_disk, gnt_disk in zip(db_disks,
sorted(gnt_disks_parsed.items())):
......@@ -352,9 +354,11 @@ class BackendReconciler(object):
self.log.info(msg, server_id, db_disks_str, gnt_disks_str)
if self.options["fix_unsynced_disks"]:
vm = get_locked_server(server_id)
backend_mod.process_disks_status(vm=vm,
etime=self.event_time,
disks=gnt_disks)
backend_mod.process_op_status(
vm=vm, etime=self.event_time, jobid=-0,
opcode="OP_INSTANCE_SET_PARAMS", status='success',
logmsg="Reconciliation: simulated Ganeti event",
disks=gnt_disks)
def reconcile_pending_task(self, server_id, db_server):
job_id = db_server.task_job_id
......
......@@ -36,7 +36,6 @@ from synnefo.logic import backend, commands
log = logging.getLogger(__name__)
@commands.server_command("ATTACH_VOLUME")
def attach_volume(vm, volume):
"""Attach a volume to a server.
......@@ -60,24 +59,36 @@ def attach_volume(vm, volume):
raise faults.BadRequest(msg)
# Check maximum disk per instance hard limit
if vm.volumes.filter(deleted=False).count() == settings.GANETI_MAX_DISKS_PER_INSTANCE:
vm_volumes_num = vm.volumes.filter(deleted=False).count()
if vm_volumes_num == settings.GANETI_MAX_DISKS_PER_INSTANCE:
raise faults.BadRequest("Maximum volumes per server limit reached")
jobid = backend.attach_volume(vm, volume)
if volume.status == "CREATING":
action_fields = {"disks": [("add", volume, {})]}
else:
action_fields = {}
comm = commands.server_command("ATTACH_VOLUME",
action_fields=action_fields)
return comm(_attach_volume)(vm, volume)
def _attach_volume(vm, volume):
"""Attach a Volume to a VM and update the Volume's status."""
jobid = backend.attach_volume(vm, volume)
log.info("Attached volume '%s' to server '%s'. JobID: '%s'", volume.id,
volume.machine_id, jobid)
volume.backendjobid = jobid
volume.machine = vm
volume.status = "ATTACHING"
if volume.status == "AVAILALBE":
volume.status = "ATTACHING"
else:
volume.status = "CREATING"
volume.save()
return jobid
@commands.server_command("DETACH_VOLUME")
def detach_volume(vm, volume):
"""Detach a volume to a server.
"""Detach a Volume from a VM
The volume must be in 'IN_USE' status in order to be detached. Also,
the root volume of the instance (index=0) can not be detached. This
......@@ -87,23 +98,34 @@ def detach_volume(vm, volume):
"""
_check_attachment(vm, volume)
if volume.status != "IN_USE":
#TODO: Maybe allow other statuses as well ?
if volume.status not in ["IN_USE", "ERROR"]:
raise faults.BadRequest("Cannot detach volume while volume is in"
" '%s' status." % volume.status)
if volume.index == 0:
raise faults.BadRequest("Cannot detach the root volume of a server")
action_fields = {"disks": [("remove", volume, {})]}
comm = commands.server_command("DETACH_VOLUME",
action_fields=action_fields)
return comm(_detach_volume)(vm, volume)
def _detach_volume(vm, volume):
"""Detach a Volume from a VM and update the Volume's status"""
jobid = backend.detach_volume(vm, volume)
log.info("Detached volume '%s' from server '%s'. JobID: '%s'", volume.id,
volume.machine_id, jobid)
volume.backendjobid = jobid
volume.status = "DETACHING"
if volume.delete_on_termination:
volume.status = "DELETING"
else:
volume.status = "DETACHING"
volume.save()
return jobid
def _check_attachment(vm, volume):
"""Check that volume is attached to vm."""
"""Check that the Volume is attached to the VM"""
if volume.machine_id != vm.id:
raise faults.BadRequest("Volume '%s' is not attached to server '%s'"
% volume.id, vm.id)
......@@ -101,6 +101,10 @@ def id_from_disk_name(name):
return int(ns)
def id_to_disk_name(id):
return "%svol-%s" % (settings.BACKEND_PREFIX_ID, str(id))
def get_rsapi_state(vm):
"""Returns the API state for a virtual machine
......@@ -175,6 +179,7 @@ OPCODE_TO_ACTION = {
def get_action_from_opcode(opcode, job_fields):
if opcode == "OP_INSTANCE_SET_PARAMS":
nics = job_fields.get("nics")
disks = job_fields.get("disks")
beparams = job_fields.get("beparams")
if nics:
try:
......@@ -187,6 +192,17 @@ def get_action_from_opcode(opcode, job_fields):
return None
except:
return None
if disks:
try:
disk_action = disks[0][0]
if disk_action == "add":
return "ATTACH_VOLUME"
elif disk_action == "remove":
return "DETACH_VOLUME"
else:
return None
except:
return None
elif beparams:
return "RESIZE"
else:
......
......@@ -29,10 +29,11 @@
from django.utils import simplejson as json
from django.db import transaction
from django.db.models import Sum
from snf_django.lib.api import faults
from synnefo.db.models import (QuotaHolderSerial, VirtualMachine, Network,
IPAddress)
IPAddress, Volume)
from synnefo.settings import (CYCLADES_SERVICE_TOKEN as ASTAKOS_TOKEN,
ASTAKOS_AUTH_URL)
......@@ -334,11 +335,13 @@ def get_commission_info(resource, action, action_fields=None):
flavor = resource.flavor
resources = {"cyclades.vm": 1,
"cyclades.total_cpu": flavor.cpu,
"cyclades.disk": 1073741824 * flavor.disk,
"cyclades.total_ram": 1048576 * flavor.ram}
"cyclades.total_ram": flavor.ram << 20}
online_resources = {"cyclades.cpu": flavor.cpu,
"cyclades.ram": 1048576 * flavor.ram}
"cyclades.ram": flavor.ram << 20}
if action == "BUILD":
new_volumes = resource.volumes.filter(status="CREATING")
new_volumes_size = new_volumes.aggregate(Sum("size"))["size__sum"]
resources["cyclades.disk"] = int(new_volumes_size) << 30
resources.update(online_resources)
return resources
if action == "START":
......@@ -357,6 +360,11 @@ def get_commission_info(resource, action, action_fields=None):
else:
return None
elif action == "DESTROY":
volumes = resource.volumes.filter(deleted=False)
volumes_size = volumes.aggregate(Sum("size"))["size__sum"]
if volumes_size is None:
volumes_size = 0
resources["cyclades.disk"] = int(volumes_size) << 30
if resource.operstate in ["STARTED", "BUILD", "ERROR"]:
resources.update(online_resources)
return reverse_quantities(resources)
......@@ -365,11 +373,18 @@ def get_commission_info(resource, action, action_fields=None):
cpu = beparams.get("vcpus", flavor.cpu)
ram = beparams.get("maxmem", flavor.ram)
return {"cyclades.total_cpu": cpu - flavor.cpu,
"cyclades.total_ram": 1048576 * (ram - flavor.ram)}
"cyclades.total_ram": (ram - flavor.ram) << 20}
elif action == "REASSIGN":
if resource.operstate in ["STARTED", "BUILD", "ERROR"]:
resources.update(online_resources)
return resources
elif action in ["ATTACH_VOLUME", "DETACH_VOLUME"]:
if action_fields is not None:
volumes_changes = action_fields.get("disks")
if volumes_changes is not None:
size_delta = get_volumes_size_delta(volumes_changes)
if size_delta:
return {"cyclades.disk": size_delta << 30}
else:
#["CONNECT", "DISCONNECT", "SET_FIREWALL_PROFILE"]:
return None
......@@ -392,6 +407,31 @@ def get_commission_info(resource, action, action_fields=None):
return resources
else:
return None
elif isinstance(resource, Volume):
size = resource.size
resources = {"cyclades.disk": size << 30}
if resource.status == "CREATING" and action == "BUILD":
return resources
elif action == "DESTROY":
reverse_quantities(resources)
else:
return None
def get_volumes_size_delta(volumes_changes):
"""Compute the total change in the size of volumes"""
size_delta = 0
for vchange in volumes_changes:
action, db_volume, info = vchange
if action == "add":
size_delta += int(db_volume.size)
elif action == "remove":
size_delta -= int(db_volume.size)
elif action == "modify":
size_delta += info.get("size_delta", 0)
else:
raise ValueError("Unknwon volume action '%s'" % action)
return size_delta
def reverse_quantities(resources):
......
......@@ -45,6 +45,8 @@ from synnefo.quotas import util
class GetDBHoldingsTestCase(TestCase):
maxDiff = None
def test_no_holdings(self):
holdings = util.get_db_holdings(user=None)
self.assertEqual(holdings, {})
......@@ -52,16 +54,16 @@ class GetDBHoldingsTestCase(TestCase):
def test_vm_holdings(self):
flavor = mfactory.FlavorFactory(cpu=24, ram=8192, disk=20,
disk_template='drbd')
mfactory.VirtualMachineFactory()
mfactory.VirtualMachineFactory(userid="user1", deleted=True)
mfactory.VirtualMachineFactory(flavor=flavor, userid="user1",
operstate="BUILD")
user_holdings = {"user1": {None:
{"cyclades.vm": 1,
"cyclades.total_cpu": 24,
"cyclades.cpu": 24,
"cyclades.disk": 21474836480,
"cyclades.total_ram": 8589934592,
"cyclades.ram": 8589934592}}}
mfactory.VolumeFactory(userid="user1", size=20, machine=None)
user_holdings = {"user1": {None: {"cyclades.vm": 1,
"cyclades.total_cpu": 24,
"cyclades.cpu": 24,
"cyclades.disk": 20 << 30,
"cyclades.total_ram": 8192 << 20,
"cyclades.ram": 8192 << 20}}}
holdings = util.get_db_holdings(user="user1")
self.assertEqual(holdings, user_holdings)
holdings = util.get_db_holdings()
......@@ -70,20 +72,20 @@ class GetDBHoldingsTestCase(TestCase):
##
mfactory.VirtualMachineFactory(flavor=flavor, userid="user2",
operstate="STARTED")
mfactory.VolumeFactory(userid="user2", size=30, machine=None)
user_holdings = {"user2": {None: {"cyclades.vm": 1,
"cyclades.total_cpu": 24,
"cyclades.cpu": 24,
"cyclades.disk": 21474836480,
"cyclades.total_ram": 8589934592,
"cyclades.ram": 8589934592}}}
"cyclades.disk": 30 << 30,
"cyclades.total_ram": 8192 << 20,
"cyclades.ram": 8192 << 20}}}
holdings = util.get_db_holdings(user="user2")
self.assertEqual(holdings, user_holdings)
mfactory.VirtualMachineFactory(flavor=flavor, userid="user3",
operstate="STOPPED")
user_holdings = {"user3": {None: {"cyclades.vm": 1,
"cyclades.total_cpu": 24,
"cyclades.disk": 21474836480,
"cyclades.total_ram": 8589934592}}}
"cyclades.total_cpu": 24,
"cyclades.total_ram": 8589934592}}}
holdings = util.get_db_holdings(user="user3")
self.assertEqual(holdings, user_holdings)
......@@ -133,9 +135,13 @@ class ResolvePendingTestCase(TestCase):
class GetCommissionInfoTest(TestCase):
maxDiff = None
def test_commissions(self):
flavor = mfactory.FlavorFactory(cpu=2, ram=1024, disk=20)
vm = mfactory.VirtualMachineFactory(flavor=flavor)
mfactory.VolumeFactory(size=20, machine=vm, deleted=False,
delete_on_termination=True)
#commission = quotas.get_commission_info(vm, "BUILD")
#self.assertEqual({"cyclades.vm": 1,
# "cyclades.cpu": 2,
......@@ -170,8 +176,8 @@ class GetCommissionInfoTest(TestCase):
commission = quotas.get_commission_info(vm, "DESTROY")
self.assertEqual({"cyclades.vm": -1,
"cyclades.total_cpu": -2,
"cyclades.total_ram": 1048576 * -1024,
"cyclades.disk": 1073741824 * -20}, commission)
"cyclades.total_ram": -1024 << 20,
"cyclades.disk": -20 << 30}, commission)
commission = quotas.get_commission_info(vm, "RESIZE")
self.assertEqual(None, commission)
commission = quotas.get_commission_info(vm, "RESIZE",
......
......@@ -33,7 +33,7 @@
from django.db.models import Sum, Count, Q
from synnefo.db.models import VirtualMachine, Network, IPAddress
from synnefo.db.models import VirtualMachine, Network, IPAddress, Volume
from synnefo.quotas import Quotaholder
from collections import defaultdict
......@@ -50,11 +50,13 @@ def get_db_holdings(user=None, project=None):
vms = VirtualMachine.objects.filter(deleted=False)
networks = Network.objects.filter(deleted=False)
floating_ips = IPAddress.objects.filter(deleted=False, floating_ip=True)
volumes = Volume.objects.filter(deleted=False)
if user is not None:
vms = vms.filter(userid=user)
networks = networks.filter(userid=user)
floating_ips = floating_ips.filter(userid=user)
volumes = volumes.filter(userid=user)
if project is not None:
vms = vms.filter(project=project)
......@@ -65,15 +67,20 @@ def get_db_holdings(user=None, project=None):
vm_resources = vms.values("userid", "project")\
.annotate(num=Count("id"),
total_ram=Sum("flavor__ram"),
total_cpu=Sum("flavor__cpu"),
disk=Sum("flavor__disk"))
total_cpu=Sum("flavor__cpu"))
vm_active_resources = \
vms.values("userid")\
.filter(Q(operstate="STARTED") | Q(operstate="BUILD") |
Q(operstate="ERROR"))\
.annotate(ram=Sum("flavor__ram"),
cpu=Sum("flavor__cpu"))
for vm_res in vm_resources.iterator():
user = vm_res['userid']
project = vm_res['project']
res = {"cyclades.vm": vm_res["num"],
"cyclades.total_cpu": vm_res["total_cpu"],
"cyclades.disk": vm_res["disk"] * GiB,
"cyclades.total_ram": vm_res["total_ram"] * MiB}
holdings[user][project] = res
......@@ -89,6 +96,13 @@ def get_db_holdings(user=None, project=None):
holdings[user][project]["cyclades.cpu"] = vm_res["cpu"]
holdings[user][project]["cyclades.ram"] = vm_res["ram"] * MiB
# Get disk resource
disk_resources = volumes.values("userid").annotate(Sum("size"))
for disk_res in disk_resources.iterator():
user = disk_res["userid"]
project = vm_res['project']
holdings[user][project]["cyclades.disk"] = disk_res["size__sum"] * GiB
# Get resources related with networks
net_resources = networks.values("userid", "project")\
.annotate(num=Count("id"))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment