Commit 4b23893e authored by Christos Stavrakakis's avatar Christos Stavrakakis
Browse files

Merge branch 'feature-cyclades-commission-refactor' into develop

parents b9de34c3 c85c3508
......@@ -85,8 +85,6 @@ def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
if action == "BUILD":
# Quotas for new VMs are automatically accepted by the API
return vm
commission_info = quotas.get_commission_info(vm, action=action,
action_fields=job_fields)
if vm.task_job_id == job_id and vm.serial is not None:
# Commission for this change has already been issued. So just
......@@ -101,20 +99,24 @@ def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
serial)
quotas.reject_serial(serial)
vm.serial = None
elif job_status == rapi.JOB_STATUS_SUCCESS and commission_info is not None:
log.debug("Expected job was %s. Processing job %s. Commission for"
" this job: %s", vm.task_job_id, job_id, commission_info)
# Commission for this change has not been issued, or the issued
# commission was unaware of the current change. Reject all previous
# commissions and create a new one in forced mode!
commission_name = ("client: dispatcher, resource: %s, ganeti_job: %s"
% (vm, job_id))
quotas.handle_resource_commission(vm, action,
commission_info=commission_info,
commission_name=commission_name,
force=True,
auto_accept=True)
log.debug("Issued new commission: %s", vm.serial)
elif job_status == rapi.JOB_STATUS_SUCCESS:
commission_info = quotas.get_commission_info(resource=vm,
action=action,
action_fields=job_fields)
if commission_info is not None:
# Commission for this change has not been issued, or the issued
# commission was unaware of the current change. Reject all previous
# commissions and create a new one in forced mode!
log.debug("Expected job was %s. Processing job %s.",
vm.task_job_id, job_id)
reason = ("client: dispatcher, resource: %s, ganeti_job: %s"
% (vm, job_id))
quotas.handle_resource_commission(vm, action,
action_fields=job_fields,
commission_name=reason,
force=True,
auto_accept=True)
log.debug("Issued new commission: %s", vm.serial)
return vm
......@@ -522,7 +524,7 @@ def update_network_state(network):
# Issue commission
if network.userid:
quotas.issue_and_accept_commission(network, delete=True)
quotas.issue_and_accept_commission(network, action="DESTROY")
# the above has already saved the object and committed;
# a second save would override others' changes, since the
# object is now unlocked
......
......@@ -176,7 +176,7 @@ def delete_floating_ip(floating_ip):
floating_ip.deleted = True
floating_ip.save()
# Release quota for floating IP
quotas.issue_and_accept_commission(floating_ip, delete=True)
quotas.issue_and_accept_commission(floating_ip, action="DESTROY")
transaction.commit()
# Delete the floating IP from DB
log.info("Deleted floating IP '%s' of user '%s", floating_ip,
......
......@@ -56,7 +56,7 @@ def validate_server_action(vm, action):
return
def server_command(action):
def server_command(action, action_fields=None):
"""Handle execution of a server action.
Helper function to validate and execute a server action, handle quota
......@@ -85,6 +85,7 @@ def server_command(action):
commission_name = "client: api, resource: %s" % vm
quotas.handle_resource_commission(vm, action=action,
action_fields=action_fields,
commission_name=commission_name)
vm.save()
......@@ -275,14 +276,19 @@ def reboot(vm, reboot_type):
return backend.reboot_instance(vm, reboot_type.lower())
@server_command("RESIZE")
def resize(vm, flavor):
action_fields = {"beparams": {"vcpus": flavor.cpu,
"maxmem": flavor.ram}}
comm = server_command("RESIZE", action_fields=action_fields)
return comm(_resize)(vm, flavor)
def _resize(vm, flavor):
old_flavor = vm.flavor
# User requested the same flavor
if old_flavor.id == flavor.id:
raise faults.BadRequest("Server '%s' flavor is already '%s'."
% (vm, flavor))
return None
# Check that resize can be performed
if old_flavor.disk != flavor.disk:
raise faults.BadRequest("Cannot resize instance disk.")
......@@ -290,13 +296,6 @@ def resize(vm, flavor):
raise faults.BadRequest("Cannot change instance disk template.")
log.info("Resizing VM from flavor '%s' to '%s", old_flavor, flavor)
commission_info = {"cyclades.cpu": flavor.cpu - old_flavor.cpu,
"cyclades.ram": 1048576 * (flavor.ram - old_flavor.ram)}
# Save serial to VM, since it is needed by server_command decorator
vm.serial = quotas.issue_commission(user=vm.userid,
source=quotas.DEFAULT_SOURCE,
provisions=commission_info,
name="resource: %s. resize" % vm)
return backend.resize_instance(vm, vcpus=flavor.cpu, memory=flavor.ram)
......
......@@ -32,6 +32,7 @@
from django.test import TransactionTestCase
#from snf_django.utils.testing import mocked_quotaholder
from synnefo.logic import servers
from synnefo import quotas
from synnefo.db import models_factory as mfactory, models
from mock import patch
......@@ -150,7 +151,8 @@ class ServerCommandTest(TransactionTestCase):
vm = mfactory.VirtualMachineFactory(operstate="STOPPED")
self.assertRaises(faults.BadRequest, servers.stop, vm)
vm = mfactory.VirtualMachineFactory(operstate="STARTED")
self.assertRaises(faults.BadRequest, servers.resize, vm)
flavor = mfactory.FlavorFactory()
self.assertRaises(faults.BadRequest, servers.resize, vm, flavor)
# Check that connect/disconnect is allowed only in STOPPED vms
# if hotplug is disabled.
vm = mfactory.VirtualMachineFactory(operstate="STARTED")
......@@ -167,6 +169,8 @@ class ServerCommandTest(TransactionTestCase):
vm.task = None
vm.task_job_id = None
vm.save()
with mocked_quotaholder():
quotas.accept_serial(vm.serial)
mrapi().RebootInstance.return_value = 1
with mocked_quotaholder():
servers.reboot(vm, "HARD")
......@@ -180,10 +184,8 @@ class ServerCommandTest(TransactionTestCase):
serial = vm.serial
mrapi().StartupInstance.return_value = 1
with mocked_quotaholder() as m:
servers.start(vm)
m.resolve_commissions.assert_called_once_with([],
[serial.serial])
self.assertTrue(m.issue_one_commission.called)
with self.assertRaises(quotas.ResolveError):
servers.start(vm)
# Not pending, rejct
vm.task = None
vm.serial = mfactory.QuotaHolderSerialFactory(serial=400,
......
......@@ -37,8 +37,7 @@ from synnefo.db.models import (QuotaHolderSerial, VirtualMachine, Network,
from synnefo.settings import (CYCLADES_SERVICE_TOKEN as ASTAKOS_TOKEN,
ASTAKOS_AUTH_URL)
from astakosclient import AstakosClient
from astakosclient.errors import AstakosClientException, QuotaLimit
from functools import wraps
from astakosclient import errors
import logging
log = logging.getLogger(__name__)
......@@ -63,30 +62,35 @@ class Quotaholder(object):
@classmethod
def get(cls):
if cls._object is None:
cls._object = AstakosClient(
ASTAKOS_TOKEN,
ASTAKOS_AUTH_URL,
use_pool=True,
retry=3,
logger=log)
cls._object = AstakosClient(ASTAKOS_TOKEN,
ASTAKOS_AUTH_URL,
use_pool=True,
retry=3,
logger=log)
return cls._object
def handle_astakosclient_error(func):
"""Decorator for converting astakosclient errors to 500."""
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except AstakosClientException:
log.exception("Unexpected error")
class AstakosClientExceptionHandler(object):
def __init__(self, *args, **kwargs):
pass
def __enter__(self):
pass
def __exit__(self, exc_type, value, traceback):
if value is not None: # exception
if not isinstance(value, errors.AstakosClientException):
return False # reraise
if exc_type is errors.QuotaLimit:
msg, details = render_overlimit_exception(value)
raise faults.OverLimit(msg, details=details)
log.exception("Unexpected error %s" % value.message)
raise faults.InternalServerError("Unexpected error")
return wrapper
@handle_astakosclient_error
def issue_commission(user, source, provisions, name="",
force=False, auto_accept=False):
def issue_commission(resource, action, name="", force=False, auto_accept=False,
action_fields=None):
"""Issue a new commission to the quotaholder.
Issue a new commission to the quotaholder, and create the
......@@ -94,18 +98,27 @@ def issue_commission(user, source, provisions, name="",
"""
provisions = get_commission_info(resource=resource, action=action,
action_fields=action_fields)
if provisions is None:
return None
user = resource.userid
source = DEFAULT_SOURCE
qh = Quotaholder.get()
try:
serial = qh.issue_one_commission(
user, source, provisions, name=name,
force=force, auto_accept=auto_accept)
except QuotaLimit as e:
msg, details = render_overlimit_exception(e)
raise faults.OverLimit(msg, details=details)
if True: # placeholder
with AstakosClientExceptionHandler():
serial = qh.issue_one_commission(user, source,
provisions, name=name,
force=force,
auto_accept=auto_accept)
if serial:
serial_info = {"serial": serial}
if auto_accept:
serial_info["pending"] = False
serial_info["accept"] = True
serial_info["resolved"] = True
return QuotaHolderSerial.objects.create(**serial_info)
......@@ -114,7 +127,9 @@ def issue_commission(user, source, provisions, name="",
def accept_serial(serial, strict=True):
assert serial.pending or serial.accept
response = resolve_commissions(accept=[serial.serial], strict=strict)
serial.pending = False
serial.accept = True
serial.resolved = True
serial.save()
......@@ -122,8 +137,10 @@ def accept_serial(serial, strict=True):
def reject_serial(serial, strict=True):
assert serial.pending or not serial.accept
response = resolve_commissions(reject=[serial.serial], strict=strict)
serial.reject = True
serial.pending = False
serial.accept = False
serial.resolved = True
serial.save()
return response
......@@ -137,7 +154,6 @@ def reject_commissions(rejected, strict=True):
return resolve_commissions(reject=rejected, strict=strict)
@handle_astakosclient_error
def resolve_commissions(accept=None, reject=None, strict=True):
if accept is None:
accept = []
......@@ -145,7 +161,8 @@ def resolve_commissions(accept=None, reject=None, strict=True):
reject = []
qh = Quotaholder.get()
response = qh.resolve_commissions(accept, reject)
with AstakosClientExceptionHandler():
response = qh.resolve_commissions(accept, reject)
if strict:
failed = response["failed"]
......@@ -221,89 +238,42 @@ def render_overlimit_exception(e):
return msg, details
@transaction.commit_manually
def issue_and_accept_commission(resource, delete=False):
@transaction.commit_on_success
def issue_and_accept_commission(resource, action="BUILD", action_fields=None):
"""Issue and accept a commission to Quotaholder.
This function implements the Commission workflow, and must be called
exactly after and in the same transaction that created/updated the
resource. The workflow that implements is the following:
0) Resolve previous unresolved commission if exists
1) Issue commission and get a serial
2) Store the serial in DB and mark is as one to accept
3) Correlate the serial with the resource
4) COMMIT!
5) Accept commission to QH (reject if failed until 5)
6) Mark serial as resolved
7) COMMIT!
1) Issue commission, get a serial and correlate it with the resource
2) Store the serial in DB as a serial to accept
3) COMMIT!
4) Accept commission to QH
"""
previous_serial = resource.serial
if previous_serial is not None and not previous_serial.resolved:
if previous_serial.pending:
msg = "Issuing commission for resource '%s' while previous serial"\
" '%s' is still pending." % (resource, previous_serial)
raise Exception(msg)
elif previous_serial.accept:
accept_serial(previous_serial, strict=False)
else:
reject_serial(previous_serial, strict=False)
try:
# Convert resources in the format expected by Quotaholder
qh_resources = prepare_qh_resources(resource)
if delete:
qh_resources = reverse_quantities(qh_resources)
# Issue commission and get the assigned serial
commission_reason = ("client: api, resource: %s, delete: %s"
% (resource, delete))
serial = issue_commission(user=resource.userid, source=DEFAULT_SOURCE,
provisions=qh_resources,
name=commission_reason)
except:
transaction.rollback()
raise
commission_reason = ("client: api, resource: %s, action: %s"
% (resource, action))
serial = handle_resource_commission(resource=resource, action=action,
action_fields=action_fields,
commission_name=commission_reason)
# Mark the serial as one to accept and associate it with the resource
serial.pending = False
serial.accept = True
serial.save()
transaction.commit()
try:
# Mark the serial as one to accept and associate it with the resource
serial.pending = False
serial.accept = True
serial.save()
resource.serial = serial
resource.save()
transaction.commit()
# Accept the commission to quotaholder
accept_serial(serial)
transaction.commit()
return serial
except:
log.exception("Unexpected ERROR")
transaction.rollback()
reject_serial(serial)
transaction.commit()
raise
# Do not crash if we can not accept commission to Quotaholder. Quotas
# have already been reserved and the resource already exists in DB.
# Just log the error
log.exception("Failed to accept commission: %s", serial)
def prepare_qh_resources(resource):
if isinstance(resource, VirtualMachine):
flavor = resource.flavor
return {'cyclades.vm': 1,
'cyclades.cpu': flavor.cpu,
'cyclades.active_cpu': flavor.cpu,
'cyclades.disk': 1073741824 * flavor.disk, # flavor.disk in GB
# 'public_ip': 1,
#'disk_template': flavor.disk_template,
# flavor.ram is in MB
'cyclades.ram': 1048576 * flavor.ram,
'cyclades.active_ram': 1048576 * flavor.ram}
elif isinstance(resource, Network):
return {"cyclades.network.private": 1}
elif isinstance(resource, IPAddress):
if resource.floating_ip:
return {"cyclades.floating_ip": 1}
else:
raise ValueError("Unknown Resource '%s'" % resource)
return serial
def get_commission_info(resource, action, action_fields=None):
......@@ -346,6 +316,21 @@ def get_commission_info(resource, action, action_fields=None):
else:
#["CONNECT", "DISCONNECT", "SET_FIREWALL_PROFILE"]:
return None
elif isinstance(resource, Network):
resources = {"cyclades.network.private": 1}
if action == "BUILD":
return resources
elif action == "DESTROY":
return reverse_quantities(resources)
elif isinstance(resource, IPAddress):
if resource.floating_ip:
resources = {"cyclades.floating_ip": 1}
if action == "BUILD":
return resources
elif action == "DESTROY":
return reverse_quantities(resources)
else:
return None
def reverse_quantities(resources):
......@@ -353,8 +338,8 @@ def reverse_quantities(resources):
def handle_resource_commission(resource, action, commission_name,
commission_info=None, force=False,
auto_accept=False):
force=False, auto_accept=False,
action_fields=None):
"""Handle a issuing of a commission for a resource.
Create a new commission for a resource based on the action that
......@@ -363,28 +348,26 @@ def handle_resource_commission(resource, action, commission_name,
"""
# Try to resolve previous serial
resolve_commission(resource.serial)
# Check if action is quotable and issue the corresponding commission
serial = None
if commission_info is None:
commission_info = get_commission_info(resource, action=action)
if commission_info is not None:
# Issue new commission, associate it with the resource
if commission_name is None:
commission_name = "client: api, resource %s" % resource
serial = issue_commission(user=resource.userid,
source=DEFAULT_SOURCE,
provisions=commission_info,
name=commission_name,
force=False,
auto_accept=False)
resolve_commission(resource.serial, force=force)
serial = issue_commission(resource, action, name=commission_name,
force=force, auto_accept=auto_accept,
action_fields=action_fields)
resource.serial = serial
resource.save()
return serial
class ResolveError(Exception):
pass
def resolve_commission(serial):
def resolve_commission(serial, force=False):
if serial is None or serial.resolved:
return
if serial.pending and not force:
m = "Could not resolve commission: serial %s is undecided" % serial
raise ResolveError(m)
log.warning("Resolving pending commission: %s", serial)
if not serial.pending and serial.accept:
accept_serial(serial)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment