Commit 960c99c8 authored by Giorgos Korfiatis's avatar Giorgos Korfiatis

Support commissions on multiple projects

An action on a resource can now trigger quota changes on multiple projects.
For example, when a VM is destroyed, its attached volumes (which may refer
to different projects) are destroyed too.

Astakosclient commission helper functions are changed to take provisions
on project/resource pairs.
parent 1693258b
......@@ -642,14 +642,13 @@ class AstakosClient(object):
return self._issue_commission(request)
def issue_one_commission(self, holder, source, provisions,
def issue_one_commission(self, holder, provisions,
name="", force=False, auto_accept=False):
"""Issue one commission (with specific holder and source)
keyword arguments:
holder -- user's id (string)
source -- commission's source (ex system) (string)
provisions -- resources with their quantity (dict from string to int)
provisions -- (source, resource) mapping to quantity
name -- description of the commission (string)
force -- force this commission (boolean)
auto_accept -- auto accept this commission (boolean)
......@@ -659,8 +658,7 @@ class AstakosClient(object):
"""
check_input("issue_one_commission", self.logger,
holder=holder, source=source,
provisions=provisions)
holder=holder, provisions=provisions)
request = {}
request["force"] = force
......@@ -668,7 +666,7 @@ class AstakosClient(object):
request["name"] = name
try:
request["provisions"] = []
for resource, quantity in provisions.iteritems():
for (source, resource), quantity in provisions.iteritems():
ps = self.mk_provisions(holder, source, resource, quantity)
request["provisions"].extend(ps)
except Exception as err:
......@@ -677,8 +675,7 @@ class AstakosClient(object):
return self._issue_commission(request)
def issue_resource_reassignment(self, holder, from_source,
to_source, provisions, name="",
def issue_resource_reassignment(self, holder, provisions, name="",
force=False, auto_accept=False):
"""Change resource assignment to another project
"""
......@@ -690,7 +687,8 @@ class AstakosClient(object):
try:
request["provisions"] = []
for resource, quantity in provisions.iteritems():
for key, quantity in provisions.iteritems():
(from_source, to_source, resource) = key
ps = self.mk_provisions(
holder, from_source, resource, -quantity)
ps += self.mk_provisions(holder, to_source, resource, quantity)
......
......@@ -793,7 +793,8 @@ class TestCommissions(unittest.TestCase):
client = AstakosClient(token['id'], auth_url)
response = client.issue_one_commission(
"c02f315b-7d84-45bc-a383-552a3f97d2ad",
"system", {"cyclades.vm": 1, "cyclades.ram": 30000})
{("system", "cyclades.vm"): 1,
("system", "cyclades.ram"): 30000})
except Exception as err:
self.fail("Shouldn't have raised Exception %s" % err)
self.assertEqual(response, commission_successful_response['serial'])
......
......@@ -160,7 +160,7 @@ retry=0, use_pool=False, pool_size=8, logger=None\ **)**
commission's id (int). Otherwise it raises an AstakosClientException
exception.
**issue_resource_reassignment(**\ holder, from_source, to_source, provisions, name="", force=False, auto_accept=False\ **)**
**issue_resource_reassignment(**\ holder, provisions, name="", force=False, auto_accept=False\ **)**
Change resource assignment to another project
**get_pending_commissions()**
......
......@@ -81,8 +81,9 @@ class NetworkTest(BaseAPITest):
# TEST QUOTAS!!!
name, args, kwargs =\
self.mocked_quotaholder.issue_one_commission.mock_calls[0]
commission_resources = args[2]
self.assertEqual(commission_resources, {"cyclades.network.private": 1})
commission_resources = args[1]
self.assertEqual(commission_resources,
{("user", "cyclades.network.private"): 1})
name, args, kwargs =\
self.mocked_quotaholder.resolve_commissions.mock_calls[0]
serial = QuotaHolderSerial.objects.order_by("-serial")[0]
......
......@@ -15,7 +15,6 @@
from django.utils import simplejson as json
from django.db import transaction
from django.db.models import Sum
from snf_django.lib.api import faults
from synnefo.db.models import (QuotaHolderSerial, VirtualMachine, Network,
......@@ -26,6 +25,7 @@ from synnefo.settings import (CYCLADES_SERVICE_TOKEN as ASTAKOS_TOKEN,
from astakosclient import AstakosClient
from astakosclient import errors
from collections import defaultdict
import logging
log = logging.getLogger(__name__)
......@@ -115,27 +115,27 @@ def issue_commission(resource, action, name="", force=False, auto_accept=False,
return None
user = resource.userid
source = resource.project
projects = set(p for (p, r) in provisions.keys())
qh = Quotaholder.get()
if action == "REASSIGN":
try:
from_project = action_fields["from_project"]
to_project = action_fields["to_project"]
except KeyError:
raise Exception("Missing project attribute.")
projects = [from_project, to_project]
ext_provisions = {}
for (project, res), quantity in provisions.iteritems():
ext_provisions[(from_project, project, res)] = quantity
projects.add(from_project)
with AstakosClientExceptionHandler(user=user, projects=projects):
serial = qh.issue_resource_reassignment(user,
from_project, to_project,
provisions, name=name,
serial = qh.issue_resource_reassignment(user, ext_provisions,
name=name,
force=force,
auto_accept=auto_accept)
else:
with AstakosClientExceptionHandler(user=user, projects=[source]):
serial = qh.issue_one_commission(user, source,
provisions, name=name,
with AstakosClientExceptionHandler(user=user, projects=projects):
serial = qh.issue_one_commission(user, provisions, name=name,
force=force,
auto_accept=auto_accept)
......@@ -316,19 +316,30 @@ def issue_and_accept_commission(resource, action="BUILD", action_fields=None):
log.exception("Failed to accept commission: %s", resource.serial)
def get_volume_resources(volumes):
resources = defaultdict(lambda: 0)
for volume in volumes:
volproj = volume.project
resources[(volproj, "cyclades.disk")] += int(volume.size) << 30
return resources
def get_commission_info(resource, action, action_fields=None):
project = resource.project
if isinstance(resource, VirtualMachine):
resources = defaultdict(lambda: 0)
flavor = resource.flavor
resources = {"cyclades.vm": 1,
"cyclades.total_cpu": flavor.cpu,
"cyclades.total_ram": flavor.ram << 20}
online_resources = {"cyclades.cpu": flavor.cpu,
"cyclades.ram": flavor.ram << 20}
offline_resources = {(project, "cyclades.vm"): 1,
(project, "cyclades.total_cpu"): flavor.cpu,
(project, "cyclades.total_ram"): flavor.ram << 20,
}
online_resources = {(project, "cyclades.cpu"): flavor.cpu,
(project, "cyclades.ram"): flavor.ram << 20}
if action == "BUILD":
new_volumes = resource.volumes.filter(status="CREATING")
new_volumes_size = new_volumes.aggregate(Sum("size"))["size__sum"]
resources["cyclades.disk"] = int(new_volumes_size) << 30
resources.update(offline_resources)
resources.update(online_resources)
resources.update(get_volume_resources(new_volumes))
return resources
if action == "START":
if resource.operstate == "STOPPED":
......@@ -347,10 +358,8 @@ def get_commission_info(resource, action, action_fields=None):
return None
elif action == "DESTROY":
volumes = resource.volumes.filter(deleted=False)
volumes_size = volumes.aggregate(Sum("size"))["size__sum"]
if volumes_size is None:
volumes_size = 0
resources["cyclades.disk"] = int(volumes_size) << 30
resources.update(offline_resources)
resources.update(get_volume_resources(volumes))
if resource.operstate in ["STARTED", "BUILD", "ERROR"]:
resources.update(online_resources)
return reverse_quantities(resources)
......@@ -358,9 +367,10 @@ def get_commission_info(resource, action, action_fields=None):
beparams = action_fields.get("beparams")
cpu = beparams.get("vcpus", flavor.cpu)
ram = beparams.get("maxmem", flavor.ram)
return {"cyclades.total_cpu": cpu - flavor.cpu,
"cyclades.total_ram": (ram - flavor.ram) << 20}
return {(project, "cyclades.total_cpu"): cpu - flavor.cpu,
(project, "cyclades.total_ram"): (ram - flavor.ram) << 20}
elif action == "REASSIGN":
resources.update(offline_resources)
if resource.operstate in ["STARTED", "BUILD", "ERROR"]:
resources.update(online_resources)
return resources
......@@ -368,14 +378,16 @@ def get_commission_info(resource, action, action_fields=None):
if action_fields is not None:
volumes_changes = action_fields.get("disks")
if volumes_changes is not None:
size_delta = get_volumes_size_delta(volumes_changes)
if size_delta:
return {"cyclades.disk": size_delta << 30}
for action, db_volume, info in volumes_changes:
project = db_volume.project
resources[(project, "cyclades.disk")] += \
get_volume_size_delta(action, db_volume, info)
return resources
else:
#["CONNECT", "DISCONNECT", "SET_FIREWALL_PROFILE"]:
return None
elif isinstance(resource, Network):
resources = {"cyclades.network.private": 1}
resources = {(project, "cyclades.network.private"): 1}
if action == "BUILD":
return resources
elif action == "DESTROY":
......@@ -384,7 +396,7 @@ def get_commission_info(resource, action, action_fields=None):
return resources
elif isinstance(resource, IPAddress):
if resource.floating_ip:
resources = {"cyclades.floating_ip": 1}
resources = {(project, "cyclades.floating_ip"): 1}
if action == "BUILD":
return resources
elif action == "DESTROY":
......@@ -395,29 +407,25 @@ def get_commission_info(resource, action, action_fields=None):
return None
elif isinstance(resource, Volume):
size = resource.size
resources = {"cyclades.disk": size << 30}
resources = {(project, "cyclades.disk"): size << 30}
if resource.status == "CREATING" and action == "BUILD":
return resources
elif action == "DESTROY":
reverse_quantities(resources)
return reverse_quantities(resources)
else:
return None
def get_volumes_size_delta(volumes_changes):
"""Compute the total change in the size of volumes"""
size_delta = 0
for vchange in volumes_changes:
action, db_volume, info = vchange
if action == "add":
size_delta += int(db_volume.size)
elif action == "remove":
size_delta -= int(db_volume.size)
elif action == "modify":
size_delta += info.get("size_delta", 0)
else:
raise ValueError("Unknown volume action '%s'" % action)
return size_delta
def get_volume_size_delta(action, db_volume, info):
"""Compute the change in the size of a volume"""
if action == "add":
return int(db_volume.size) << 30
elif action == "remove":
return -int(db_volume.size) << 30
elif action == "modify":
return info.get("size_delta", 0) << 30
else:
raise ValueError("Unknown volume action '%s'" % action)
def reverse_quantities(resources):
......
......@@ -132,45 +132,46 @@ class GetCommissionInfoTest(TestCase):
# "cyclades.disk": 1073741824 * 20}, commission)
vm.operstate = "STARTED"
vm.save()
project = vm.project
commission = quotas.get_commission_info(vm, "STOP")
self.assertEqual({"cyclades.cpu": -2,
"cyclades.ram": 1048576 * -1024}, commission)
self.assertEqual({(project, "cyclades.cpu"): -2,
(project, "cyclades.ram"): 1048576 * -1024}, commission)
# Check None quotas if vm is already stopped
vm.operstate = "STOPPED"
vm.save()
commission = quotas.get_commission_info(vm, "STOP")
self.assertEqual(None, commission)
commission = quotas.get_commission_info(vm, "START")
self.assertEqual({"cyclades.cpu": 2,
"cyclades.ram": 1048576 * 1024}, commission)
self.assertEqual({(project, "cyclades.cpu"): 2,
(project, "cyclades.ram"): 1048576 * 1024}, commission)
vm.operstate = "STARTED"
vm.save()
commission = quotas.get_commission_info(vm, "DESTROY")
self.assertEqual({"cyclades.vm": -1,
"cyclades.total_cpu": -2,
"cyclades.cpu": -2,
"cyclades.total_ram": 1048576 * -1024,
"cyclades.ram": 1048576 * -1024,
"cyclades.disk": 1073741824 * -20}, commission)
self.assertEqual({(project, "cyclades.vm"): -1,
(project, "cyclades.total_cpu"): -2,
(project, "cyclades.cpu"): -2,
(project, "cyclades.total_ram"): 1048576 * -1024,
(project, "cyclades.ram"): 1048576 * -1024,
(project, "cyclades.disk"): 1073741824 * -20}, commission)
vm.operstate = "STOPPED"
vm.save()
commission = quotas.get_commission_info(vm, "DESTROY")
self.assertEqual({"cyclades.vm": -1,
"cyclades.total_cpu": -2,
"cyclades.total_ram": -1024 << 20,
"cyclades.disk": -20 << 30}, commission)
self.assertEqual({(project, "cyclades.vm"): -1,
(project, "cyclades.total_cpu"): -2,
(project, "cyclades.total_ram"): -1024 << 20,
(project, "cyclades.disk"): -20 << 30}, commission)
commission = quotas.get_commission_info(vm, "RESIZE")
self.assertEqual(None, commission)
commission = quotas.get_commission_info(vm, "RESIZE",
{"beparams": {"vcpus": 4,
"maxmem": 2048}})
self.assertEqual({"cyclades.total_cpu": 2,
"cyclades.total_ram": 1048576 * 1024}, commission)
self.assertEqual({(project, "cyclades.total_cpu"): 2,
(project, "cyclades.total_ram"): 1048576 * 1024}, commission)
vm.operstate = "STOPPED"
vm.save()
commission = quotas.get_commission_info(vm, "REBOOT")
self.assertEqual({"cyclades.cpu": 2,
"cyclades.ram": 1048576 * 1024}, commission)
self.assertEqual({(project, "cyclades.cpu"): 2,
(project, "cyclades.ram"): 1048576 * 1024}, commission)
vm.operstate = "STARTED"
vm.save()
commission = quotas.get_commission_info(vm, "REBOOT")
......
......@@ -665,15 +665,17 @@ class ModularBackend(BaseBackend):
path, node = self._lookup_container(account, container)
if PROJECT in policy:
project = self._get_project(node)
from_project = self._get_project(node)
to_project = policy[PROJECT]
provisions = {
(from_project, to_project, 'pithos.diskspace'):
self.get_container_meta(
user, account, container,
include_user_defined=False)['bytes']}
try:
serial = self.astakosclient.issue_resource_reassignment(
holder=account,
from_source=project,
to_source=policy[PROJECT],
provisions={'pithos.diskspace': self.get_container_meta(
user, account, container,
include_user_defined=False)['bytes']})
holder=account, provisions=provisions)
except BaseException, e:
raise QuotaError(e)
else:
......@@ -1873,8 +1875,7 @@ class ModularBackend(BaseBackend):
name = details['path'] if 'path' in details else ''
serial = self.astakosclient.issue_one_commission(
holder=account,
source=source,
provisions={'pithos.diskspace': size},
provisions={(source, 'pithos.diskspace'): size},
name=name)
except BaseException, e:
raise QuotaError(e)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment