__init__.py 13.8 KB
Newer Older
1
# Copyright 2012, 2013 GRNET S.A. All rights reserved.
Christos Stavrakakis's avatar
Christos Stavrakakis committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
#   1. Redistributions of source code must retain the above copyright
#      notice, this list of conditions and the following disclaimer.
#
#  2. Redistributions in binary form must reproduce the above copyright
#     notice, this list of conditions and the following disclaimer in the
#     documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
# The views and conclusions contained in the software and documentation are
# those of the authors and should not be interpreted as representing official
# policies, either expressed or implied, of GRNET S.A.

Christos Stavrakakis's avatar
Christos Stavrakakis committed
30
from django.utils import simplejson as json
31
from django.db import transaction
Christos Stavrakakis's avatar
Christos Stavrakakis committed
32

33
from snf_django.lib.api import faults
34
from synnefo.db.models import (QuotaHolderSerial, VirtualMachine, Network,
35
                               IPAddress)
36

37
from synnefo.settings import (CYCLADES_SERVICE_TOKEN as ASTAKOS_TOKEN,
38
                              ASTAKOS_AUTH_URL)
39
from astakosclient import AstakosClient
40
from astakosclient import errors
Christos Stavrakakis's avatar
Christos Stavrakakis committed
41 42 43 44

import logging
log = logging.getLogger(__name__)

45

46 47 48
QUOTABLE_RESOURCES = [VirtualMachine, Network, IPAddress]


Christos Stavrakakis's avatar
Christos Stavrakakis committed
49 50
RESOURCES = [
    "cyclades.vm",
51
    "cyclades.total_cpu",
Christos Stavrakakis's avatar
Christos Stavrakakis committed
52 53
    "cyclades.cpu",
    "cyclades.disk",
54
    "cyclades.total_ram",
Christos Stavrakakis's avatar
Christos Stavrakakis committed
55
    "cyclades.ram",
56 57
    "cyclades.network.private",
    "cyclades.floating_ip",
Christos Stavrakakis's avatar
Christos Stavrakakis committed
58
]
Christos Stavrakakis's avatar
Christos Stavrakakis committed
59 60


61 62 63 64 65 66
class Quotaholder(object):
    _object = None

    @classmethod
    def get(cls):
        if cls._object is None:
67 68 69 70 71
            cls._object = AstakosClient(ASTAKOS_TOKEN,
                                        ASTAKOS_AUTH_URL,
                                        use_pool=True,
                                        retry=3,
                                        logger=log)
72
        return cls._object
Christos Stavrakakis's avatar
Christos Stavrakakis committed
73 74


75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
class AstakosClientExceptionHandler(object):
    def __init__(self, *args, **kwargs):
        pass

    def __enter__(self):
        pass

    def __exit__(self, exc_type, value, traceback):
        if value is not None:  # exception
            if not isinstance(value, errors.AstakosClientException):
                return False  # reraise
            if exc_type is errors.QuotaLimit:
                msg, details = render_overlimit_exception(value)
                raise faults.OverLimit(msg, details=details)

            log.exception("Unexpected error %s" % value.message)
91 92 93
            raise faults.InternalServerError("Unexpected error")


94 95
def issue_commission(resource, action, name="", force=False, auto_accept=False,
                     action_fields=None):
Christos Stavrakakis's avatar
Christos Stavrakakis committed
96 97 98 99 100 101 102
    """Issue a new commission to the quotaholder.

    Issue a new commission to the quotaholder, and create the
    corresponing QuotaHolderSerial object in DB.

    """

103 104 105 106 107 108 109
    provisions = get_commission_info(resource=resource, action=action,
                                     action_fields=action_fields)

    if provisions is None:
        return None

    user = resource.userid
110
    source = resource.project
111

112
    qh = Quotaholder.get()
113 114
    if True:  # placeholder
        with AstakosClientExceptionHandler():
115 116 117 118
            serial = qh.issue_one_commission(user, source,
                                             provisions, name=name,
                                             force=force,
                                             auto_accept=auto_accept)
Christos Stavrakakis's avatar
Christos Stavrakakis committed
119

120
    if not serial:
121
        raise Exception("No serial")
Christos Stavrakakis's avatar
Christos Stavrakakis committed
122

123 124 125 126 127 128 129 130 131 132 133 134 135
    serial_info = {"serial": serial}
    if auto_accept:
        serial_info["pending"] = False
        serial_info["accept"] = True
        serial_info["resolved"] = True

    serial = QuotaHolderSerial.objects.create(**serial_info)

    # Correlate the serial with the resource. Resolved serials are not
    # attached to resources
    if not auto_accept:
        resource.serial = serial
        resource.save()
Christos Stavrakakis's avatar
Christos Stavrakakis committed
136

137 138 139 140 141
    return serial


def accept_resource_serial(resource, strict=True):
    serial = resource.serial
142
    assert serial.pending or serial.accept, "%s can't be accepted" % serial
143 144 145 146 147
    log.debug("Accepting serial %s of resource %s", serial, resource)
    _resolve_commissions(accept=[serial.serial], strict=strict)
    resource.serial = None
    resource.save()
    return resource
148 149


150 151
def reject_resource_serial(resource, strict=True):
    serial = resource.serial
152
    assert serial.pending or not serial.accept, "%s can't be rejected" % serial
153 154 155 156 157
    log.debug("Rejecting serial %s of resource %s", serial, resource)
    _resolve_commissions(reject=[serial.serial], strict=strict)
    resource.serial = None
    resource.save()
    return resource
158 159


160
def _resolve_commissions(accept=None, reject=None, strict=True):
161 162 163 164
    if accept is None:
        accept = []
    if reject is None:
        reject = []
165

166
    qh = Quotaholder.get()
167 168
    with AstakosClientExceptionHandler():
        response = qh.resolve_commissions(accept, reject)
169

170 171 172 173 174 175 176 177 178
    accepted = response.get("accepted", [])
    rejected = response.get("rejected", [])

    if accepted:
        QuotaHolderSerial.objects.filter(serial__in=accepted).update(
            accept=True, pending=False, resolved=True)
    if rejected:
        QuotaHolderSerial.objects.filter(serial__in=rejected).update(
            accept=False, pending=False, resolved=True)
179

180 181 182 183 184 185 186
    if strict:
        failed = response["failed"]
        if failed:
            log.error("Unexpected error while resolving commissions: %s",
                      failed)

    return response
187 188


189 190 191 192 193 194 195 196 197
def reconcile_resolve_commissions(accept=None, reject=None, strict=True):
    response = _resolve_commissions(accept=accept,
                                    reject=reject,
                                    strict=strict)
    affected = response.get("accepted", []) + response.get("rejected", [])
    for resource in QUOTABLE_RESOURCES:
        resource.objects.filter(serial__in=affected).update(serial=None)


198 199 200 201 202 203 204
def resolve_pending_commissions():
    """Resolve quotaholder pending commissions.

    Get pending commissions from the quotaholder and resolve them
    to accepted and rejected, according to the state of the
    QuotaHolderSerial DB table. A pending commission in the quotaholder
    can exist in the QuotaHolderSerial table and be either accepted or
205
    rejected, or cannot exist in this table, so it is rejected.
206 207 208 209 210 211 212 213 214 215 216

    """

    qh_pending = get_quotaholder_pending()
    if not qh_pending:
        return ([], [])

    qh_pending.sort()
    min_ = qh_pending[0]

    serials = QuotaHolderSerial.objects.filter(serial__gte=min_, pending=False)
217
    accepted = serials.filter(accept=True).values_list('serial', flat=True)
218 219 220 221 222 223 224 225
    accepted = filter(lambda x: x in qh_pending, accepted)

    rejected = list(set(qh_pending) - set(accepted))

    return (accepted, rejected)


def get_quotaholder_pending():
226
    qh = Quotaholder.get()
227
    pending_serials = qh.get_pending_commissions()
228
    return pending_serials
229 230


231
def render_overlimit_exception(e):
232 233 234
    resource_name = {"vm": "Virtual Machine",
                     "cpu": "CPU",
                     "ram": "RAM",
235 236
                     "network.private": "Private Network",
                     "floating_ip": "Floating IP address"}
Christos Stavrakakis's avatar
Christos Stavrakakis committed
237
    details = json.loads(e.details)
238
    data = details['overLimit']['data']
Christos Stavrakakis's avatar
Christos Stavrakakis committed
239 240 241
    usage = data["usage"]
    limit = data["limit"]
    available = limit - usage
242 243 244 245
    provision = data['provision']
    requested = provision['quantity']
    resource = provision['resource']
    res = resource.replace("cyclades.", "", 1)
246 247 248 249 250 251 252
    try:
        resource = resource_name[res]
    except KeyError:
        resource = res

    msg = "Resource Limit Exceeded for your account."
    details = "Limit for resource '%s' exceeded for your account."\
253 254
              " Available: %s, Requested: %s"\
              % (resource, available, requested)
255
    return msg, details
256 257


258
@transaction.commit_on_success
259
def issue_and_accept_commission(resource, action="BUILD", action_fields=None):
260 261 262 263 264
    """Issue and accept a commission to Quotaholder.

    This function implements the Commission workflow, and must be called
    exactly after and in the same transaction that created/updated the
    resource. The workflow that implements is the following:
265
    0) Resolve previous unresolved commission if exists
266 267 268 269
    1) Issue commission, get a serial and correlate it with the resource
    2) Store the serial in DB as a serial to accept
    3) COMMIT!
    4) Accept commission to QH
270 271

    """
272 273
    commission_reason = ("client: api, resource: %s, action: %s"
                         % (resource, action))
274
    serial = handle_resource_commission(resource=resource, action=action,
275
                                        action_fields=action_fields,
276
                                        commission_name=commission_reason)
277

278 279 280
    if serial is None:
        return

281 282 283 284 285
    # Mark the serial as one to accept and associate it with the resource
    serial.pending = False
    serial.accept = True
    serial.save()
    transaction.commit()
286 287

    try:
288
        # Accept the commission to quotaholder
289
        accept_resource_serial(resource)
290
    except:
291 292 293
        # Do not crash if we can not accept commission to Quotaholder. Quotas
        # have already been reserved and the resource already exists in DB.
        # Just log the error
294
        log.exception("Failed to accept commission: %s", resource.serial)
295 296


297 298 299 300
def get_commission_info(resource, action, action_fields=None):
    if isinstance(resource, VirtualMachine):
        flavor = resource.flavor
        resources = {"cyclades.vm": 1,
301
                     "cyclades.total_cpu": flavor.cpu,
302
                     "cyclades.disk": 1073741824 * flavor.disk,
303 304 305
                     "cyclades.total_ram": 1048576 * flavor.ram}
        online_resources = {"cyclades.cpu": flavor.cpu,
                            "cyclades.ram": 1048576 * flavor.ram}
306 307 308
        if action == "BUILD":
            resources.update(online_resources)
            return resources
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
        if action == "START":
            if resource.operstate == "STOPPED":
                return online_resources
            else:
                return None
        elif action == "STOP":
            if resource.operstate in ["STARTED", "BUILD", "ERROR"]:
                return reverse_quantities(online_resources)
            else:
                return None
        elif action == "REBOOT":
            if resource.operstate == "STOPPED":
                return online_resources
            else:
                return None
        elif action == "DESTROY":
            if resource.operstate in ["STARTED", "BUILD", "ERROR"]:
                resources.update(online_resources)
            return reverse_quantities(resources)
        elif action == "RESIZE" and action_fields:
            beparams = action_fields.get("beparams")
            cpu = beparams.get("vcpus", flavor.cpu)
            ram = beparams.get("maxmem", flavor.ram)
332 333
            return {"cyclades.total_cpu": cpu - flavor.cpu,
                    "cyclades.total_ram": 1048576 * (ram - flavor.ram)}
334 335 336
        else:
            #["CONNECT", "DISCONNECT", "SET_FIREWALL_PROFILE"]:
            return None
337 338 339 340 341 342
    elif isinstance(resource, Network):
        resources = {"cyclades.network.private": 1}
        if action == "BUILD":
            return resources
        elif action == "DESTROY":
            return reverse_quantities(resources)
343 344 345 346 347 348 349 350 351
    elif isinstance(resource, IPAddress):
        if resource.floating_ip:
            resources = {"cyclades.floating_ip": 1}
            if action == "BUILD":
                return resources
            elif action == "DESTROY":
                return reverse_quantities(resources)
        else:
            return None
352 353


354 355
def reverse_quantities(resources):
    return dict((r, -s) for r, s in resources.items())
356 357


358
def handle_resource_commission(resource, action, commission_name,
359 360
                               force=False, auto_accept=False,
                               action_fields=None):
361 362 363 364 365 366 367
    """Handle a issuing of a commission for a resource.

    Create a new commission for a resource based on the action that
    is performed. If the resource has a previous pending commission,
    resolved it before issuing the new one.

    """
368 369 370 371 372 373
    # Try to resolve previous serial:
    # If action is DESTROY, we must always reject the previous commission,
    # since multiple DESTROY actions are allowed in the same resource (e.g. VM)
    # The one who succeeds will be finally accepted, and all other will be
    # rejected
    force = force or (action == "DESTROY")
374
    resolve_resource_commission(resource, force=force)
375

376 377 378
    serial = issue_commission(resource, action, name=commission_name,
                              force=force, auto_accept=auto_accept,
                              action_fields=action_fields)
379
    return serial
380 381


382 383 384 385
class ResolveError(Exception):
    pass


386 387
def resolve_resource_commission(resource, force=False):
    serial = resource.serial
388 389
    if serial is None or serial.resolved:
        return
390 391 392
    if serial.pending and not force:
        m = "Could not resolve commission: serial %s is undecided" % serial
        raise ResolveError(m)
393 394
    log.warning("Resolving pending commission: %s", serial)
    if not serial.pending and serial.accept:
395
        accept_resource_serial(resource)
396
    else:
397
        reject_resource_serial(resource)