callpoint.py 10.5 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1
# Copyright 2012, 2013 GRNET S.A. All rights reserved.
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
#   1. Redistributions of source code must retain the above
#      copyright notice, this list of conditions and the following
#      disclaimer.
#
#   2. Redistributions in binary form must reproduce the above
#      copyright notice, this list of conditions and the following
#      disclaimer in the documentation and/or other materials
#      provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.

34
from datetime import datetime
35
from django.db.models import Q
36
from astakos.quotaholder_app.exception import (
37
    QuotaholderError,
38
    NoCommissionError,
39
    CorruptedError, InvalidDataError,
40
    NoHoldingError,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
41
    DuplicateError)
42

43
from astakos.quotaholder_app.commission import (
44
    Import, Release, Operations, finalize, undo)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
45

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
46
from astakos.quotaholder_app.models import (
47
    Holding, Commission, Provision, ProvisionLog)
48

49

50 51 52 53
def format_datetime(d):
    return d.strftime('%Y-%m-%dT%H:%M:%S.%f')[:24]


54 55 56 57 58
def get_quota(holders=None, sources=None, resources=None, flt=None):
    if flt is None:
        flt = Q()

    holdings = Holding.objects.filter(flt)
59

60 61
    if holders is not None:
        holdings = holdings.filter(holder__in=holders)
62

63 64
    if sources is not None:
        holdings = holdings.filter(source__in=sources)
65

66 67
    if resources is not None:
        holdings = holdings.filter(resource__in=resources)
68

69 70 71
    quotas = {}
    for holding in holdings:
        key = (holding.holder, holding.source, holding.resource)
72
        value = (holding.limit, holding.usage_min, holding.usage_max)
73
        quotas[key] = value
74

75
    return quotas
76 77


78 79 80 81 82 83 84
def delete_quota(keys):
    for holder, source, resource in keys:
        Holding.objects.filter(holder=holder,
                               source=source,
                               resource=resource).delete()


85 86
def _get_holdings_for_update(holding_keys, resource=None, delete=False):
    flt = Q(resource=resource) if resource is not None else Q()
87
    holders = set(holder for (holder, source, resource) in holding_keys)
88
    objs = Holding.objects.filter(flt, holder__in=holders).order_by('pk')
89
    hs = objs.select_for_update()
90

91
    keys = set(holding_keys)
92
    holdings = {}
93
    put_back = []
94 95
    for h in hs:
        key = h.holder, h.source, h.resource
96 97 98 99 100 101 102 103
        if key in keys:
            holdings[key] = h
        else:
            put_back.append(h)

    if delete:
        objs.delete()
        Holding.objects.bulk_create(put_back)
104
    return holdings
105 106


107 108 109 110 111 112 113 114 115
def _mkProvision(key, quantity):
    holder, source, resource = key
    return {'holder': holder,
            'source': source,
            'resource': resource,
            'quantity': quantity,
            }


116
def set_quota(quotas, resource=None):
117
    holding_keys = [key for (key, limit) in quotas]
118 119
    holdings = _get_holdings_for_update(
        holding_keys, resource=resource, delete=True)
120

121
    new_holdings = {}
122
    for key, limit in quotas:
123 124 125
        holder, source, res = key
        if resource is not None and resource != res:
            continue
126 127
        h = Holding(holder=holder,
                    source=source,
128
                    resource=res,
129
                    limit=limit)
130
        try:
131 132 133
            h_old = holdings[key]
            h.usage_min = h_old.usage_min
            h.usage_max = h_old.usage_max
134
            h.id = h_old.id
135
        except KeyError:
136 137 138 139
            pass
        new_holdings[key] = h

    Holding.objects.bulk_create(new_holdings.values())
140

141

142
def issue_commission(clientkey, provisions, name="", force=False):
143 144
    operations = Operations()
    provisions_to_create = []
145

146 147 148 149 150 151 152
    keys = [key for (key, value) in provisions]
    holdings = _get_holdings_for_update(keys)
    try:
        checked = []
        for key, quantity in provisions:
            if not isinstance(quantity, (int, long)):
                raise InvalidDataError("Malformed provision")
153

154 155 156 157 158 159
            if key in checked:
                m = "Duplicate provision for %s" % str(key)
                provision = _mkProvision(key, quantity)
                raise DuplicateError(m,
                                     provision=provision)
            checked.append(key)
160

161 162 163 164 165 166 167 168 169 170 171 172 173 174
            # Target
            try:
                th = holdings[key]
            except KeyError:
                m = ("There is no such holding %s" % str(key))
                provision = _mkProvision(key, quantity)
                raise NoHoldingError(m,
                                     provision=provision)

            if quantity >= 0:
                operations.prepare(Import, th, quantity, force)

            else:  # release
                abs_quantity = -quantity
175
                operations.prepare(Release, th, abs_quantity, False)
176 177 178 179 180 181 182 183

            holdings[key] = th
            provisions_to_create.append((key, quantity))

    except QuotaholderError:
        operations.revert()
        raise

184 185
    commission = Commission.objects.create(clientkey=clientkey,
                                           name=name,
186
                                           issue_datetime=datetime.now())
187 188 189 190 191 192 193 194 195 196
    for (holder, source, resource), quantity in provisions_to_create:
        Provision.objects.create(serial=commission,
                                 holder=holder,
                                 source=source,
                                 resource=resource,
                                 quantity=quantity)

    return commission.serial


197
def _log_provision(commission, provision, holding, log_datetime, reason):
198 199 200 201 202 203 204 205

    kwargs = {
        'serial':              commission.serial,
        'name':                commission.name,
        'holder':              holding.holder,
        'source':              holding.source,
        'resource':            holding.resource,
        'limit':               holding.limit,
206 207
        'usage_min':           holding.usage_min,
        'usage_max':           holding.usage_max,
208
        'delta_quantity':      provision.quantity,
209 210
        'issue_time':          format_datetime(commission.issue_datetime),
        'log_time':            format_datetime(log_datetime),
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
        'reason':              reason,
    }

    ProvisionLog.objects.create(**kwargs)


def _get_commissions_for_update(clientkey, serials):
    cs = Commission.objects.filter(
        clientkey=clientkey, serial__in=serials).select_for_update()

    commissions = {}
    for c in cs:
        commissions[c.serial] = c
    return commissions


def _partition_by(f, l):
    d = {}
    for x in l:
        group = f(x)
        group_l = d.get(group, [])
        group_l.append(x)
        d[group] = group_l
    return d


237
def resolve_pending_commissions(clientkey, accept_set=None, reject_set=None,
238
                                reason=''):
239 240 241 242 243
    if accept_set is None:
        accept_set = []
    if reject_set is None:
        reject_set = []

244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260
    actions = dict.fromkeys(accept_set, True)
    conflicting = set()
    for serial in reject_set:
        if actions.get(serial) is True:
            actions.pop(serial)
            conflicting.add(serial)
        else:
            actions[serial] = False

    conflicting = list(conflicting)
    serials = actions.keys()
    commissions = _get_commissions_for_update(clientkey, serials)
    ps = Provision.objects.filter(serial__in=serials).select_for_update()
    holding_keys = sorted(p.holding_key() for p in ps)
    holdings = _get_holdings_for_update(holding_keys)
    provisions = _partition_by(lambda p: p.serial_id, ps)

261
    log_datetime = datetime.now()
262 263 264 265 266 267 268 269 270 271

    accepted, rejected, notFound = [], [], []
    for serial, accept in actions.iteritems():
        commission = commissions.get(serial)
        if commission is None:
            notFound.append(serial)
            continue

        accepted.append(serial) if accept else rejected.append(serial)

272
        ps = provisions.get(serial, [])
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
        for pv in ps:
            key = pv.holding_key()
            h = holdings.get(key)
            if h is None:
                raise CorruptedError("Corrupted provision")

            quantity = pv.quantity
            action = finalize if accept else undo
            if quantity >= 0:
                action(Import, h, quantity)
            else:  # release
                action(Release, h, -quantity)

            prefix = 'ACCEPT:' if accept else 'REJECT:'
            comm_reason = prefix + reason[-121:]
288
            _log_provision(commission, pv, h, log_datetime, comm_reason)
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
            pv.delete()
        commission.delete()
    return accepted, rejected, notFound, conflicting


def resolve_pending_commission(clientkey, serial, accept=True):
    if accept:
        ok, notOk, notF, confl = resolve_pending_commissions(
            clientkey=clientkey, accept_set=[serial])
    else:
        notOk, ok, notF, confl = resolve_pending_commissions(
            clientkey=clientkey, reject_set=[serial])

    assert notOk == confl == []
    assert ok + notF == [serial]
    return bool(ok)


307
def get_pending_commissions(clientkey):
308 309 310 311 312
    pending = Commission.objects.filter(clientkey=clientkey)
    pending_list = pending.values_list('serial', flat=True)
    return list(pending_list)


313
def get_commission(clientkey, serial):
314 315 316 317 318 319
    try:
        commission = Commission.objects.get(clientkey=clientkey,
                                            serial=serial)
    except Commission.DoesNotExist:
        raise NoCommissionError(serial)

320
    objs = Provision.objects
321 322 323 324 325 326
    provisions = objs.filter(serial=commission)

    ps = [p.todict() for p in provisions]

    response = {'serial':     serial,
                'provisions': ps,
327
                'issue_time': commission.issue_datetime,
328
                'name':       commission.name,
329 330
                }
    return response