Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / quotaholder_app / callpoint.py @ 18304586

History | View | Annotate | Download (10.5 kB)

1
# Copyright 2012, 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from datetime import datetime
35
from django.db.models import Q
36
from astakos.quotaholder_app.exception import (
37
    QuotaholderError,
38
    NoCommissionError,
39
    CorruptedError,
40
    NoHoldingError,
41
)
42

    
43
from astakos.quotaholder_app.commission import (
44
    Import, Release, Operations, finalize, undo)
45

    
46
from astakos.quotaholder_app.models import (
47
    Holding, Commission, Provision, ProvisionLog)
48

    
49

    
50
def format_datetime(d):
51
    return d.strftime('%Y-%m-%dT%H:%M:%S.%f')[:24]
52

    
53

    
54
def get_quota(holders=None, sources=None, resources=None, flt=None):
55
    if flt is None:
56
        flt = Q()
57

    
58
    holdings = Holding.objects.filter(flt)
59

    
60
    if holders is not None:
61
        holdings = holdings.filter(holder__in=holders)
62

    
63
    if sources is not None:
64
        holdings = holdings.filter(source__in=sources)
65

    
66
    if resources is not None:
67
        holdings = holdings.filter(resource__in=resources)
68

    
69
    quotas = {}
70
    for holding in holdings:
71
        key = (holding.holder, holding.source, holding.resource)
72
        value = (holding.limit, holding.usage_min, holding.usage_max)
73
        quotas[key] = value
74

    
75
    return quotas
76

    
77

    
78
def delete_quota(keys):
79
    for holder, source, resource in keys:
80
        Holding.objects.filter(holder=holder,
81
                               source=source,
82
                               resource=resource).delete()
83

    
84

    
85
def _get_holdings_for_update(holding_keys, resource=None, delete=False):
86
    flt = Q(resource=resource) if resource is not None else Q()
87
    holders = set(holder for (holder, source, resource) in holding_keys)
88
    objs = Holding.objects.filter(flt, holder__in=holders).order_by('pk')
89
    hs = objs.select_for_update()
90

    
91
    keys = set(holding_keys)
92
    holdings = {}
93
    put_back = []
94
    for h in hs:
95
        key = h.holder, h.source, h.resource
96
        if key in keys:
97
            holdings[key] = h
98
        else:
99
            put_back.append(h)
100

    
101
    if delete:
102
        objs.delete()
103
        Holding.objects.bulk_create(put_back)
104
    return holdings
105

    
106

    
107
def _mkProvision(key, quantity):
108
    holder, source, resource = key
109
    return {'holder': holder,
110
            'source': source,
111
            'resource': resource,
112
            'quantity': quantity,
113
            }
114

    
115

    
116
def set_quota(quotas, resource=None):
117
    holding_keys = [key for (key, limit) in quotas]
118
    holdings = _get_holdings_for_update(
119
        holding_keys, resource=resource, delete=True)
120

    
121
    new_holdings = {}
122
    for key, limit in quotas:
123
        holder, source, res = key
124
        if resource is not None and resource != res:
125
            continue
126
        h = Holding(holder=holder,
127
                    source=source,
128
                    resource=res,
129
                    limit=limit)
130
        try:
131
            h_old = holdings[key]
132
            h.usage_min = h_old.usage_min
133
            h.usage_max = h_old.usage_max
134
            h.id = h_old.id
135
        except KeyError:
136
            pass
137
        new_holdings[key] = h
138

    
139
    Holding.objects.bulk_create(new_holdings.values())
140

    
141

    
142
def _merge_same_keys(provisions):
143
    prov_dict = _partition_by(lambda t: t[0], provisions, lambda t: t[1])
144
    tuples = []
145
    for key, values in prov_dict.iteritems():
146
        tuples.append((key, sum(values)))
147
    return tuples
148

    
149

    
150
def issue_commission(clientkey, provisions, name="", force=False):
151
    operations = Operations()
152
    provisions_to_create = []
153

    
154
    provisions = _merge_same_keys(provisions)
155
    keys = [key for (key, value) in provisions]
156
    holdings = _get_holdings_for_update(keys)
157
    try:
158
        for key, quantity in provisions:
159
            # Target
160
            try:
161
                th = holdings[key]
162
            except KeyError:
163
                m = ("There is no such holding %s" % unicode(key))
164
                provision = _mkProvision(key, quantity)
165
                raise NoHoldingError(m,
166
                                     provision=provision)
167

    
168
            if quantity >= 0:
169
                operations.prepare(Import, th, quantity, force)
170

    
171
            else:  # release
172
                abs_quantity = -quantity
173
                operations.prepare(Release, th, abs_quantity, False)
174

    
175
            holdings[key] = th
176
            provisions_to_create.append((key, quantity))
177

    
178
    except QuotaholderError:
179
        operations.revert()
180
        raise
181

    
182
    commission = Commission.objects.create(clientkey=clientkey,
183
                                           name=name,
184
                                           issue_datetime=datetime.now())
185
    for (holder, source, resource), quantity in provisions_to_create:
186
        Provision.objects.create(serial=commission,
187
                                 holder=holder,
188
                                 source=source,
189
                                 resource=resource,
190
                                 quantity=quantity)
191

    
192
    return commission.serial
193

    
194

    
195
def _log_provision(commission, provision, holding, log_datetime, reason):
196

    
197
    kwargs = {
198
        'serial':              commission.serial,
199
        'name':                commission.name,
200
        'holder':              holding.holder,
201
        'source':              holding.source,
202
        'resource':            holding.resource,
203
        'limit':               holding.limit,
204
        'usage_min':           holding.usage_min,
205
        'usage_max':           holding.usage_max,
206
        'delta_quantity':      provision.quantity,
207
        'issue_time':          format_datetime(commission.issue_datetime),
208
        'log_time':            format_datetime(log_datetime),
209
        'reason':              reason,
210
    }
211

    
212
    ProvisionLog.objects.create(**kwargs)
213

    
214

    
215
def _get_commissions_for_update(clientkey, serials):
216
    cs = Commission.objects.filter(
217
        clientkey=clientkey, serial__in=serials).select_for_update()
218

    
219
    commissions = {}
220
    for c in cs:
221
        commissions[c.serial] = c
222
    return commissions
223

    
224

    
225
def _partition_by(f, l, convert=None):
226
    if convert is None:
227
        convert = lambda x: x
228
    d = {}
229
    for x in l:
230
        group = f(x)
231
        group_l = d.get(group, [])
232
        group_l.append(convert(x))
233
        d[group] = group_l
234
    return d
235

    
236

    
237
def resolve_pending_commissions(clientkey, accept_set=None, reject_set=None,
238
                                reason=''):
239
    if accept_set is None:
240
        accept_set = []
241
    if reject_set is None:
242
        reject_set = []
243

    
244
    actions = dict.fromkeys(accept_set, True)
245
    conflicting = set()
246
    for serial in reject_set:
247
        if actions.get(serial) is True:
248
            actions.pop(serial)
249
            conflicting.add(serial)
250
        else:
251
            actions[serial] = False
252

    
253
    conflicting = list(conflicting)
254
    serials = actions.keys()
255
    commissions = _get_commissions_for_update(clientkey, serials)
256
    ps = Provision.objects.filter(serial__in=serials).select_for_update()
257
    holding_keys = sorted(p.holding_key() for p in ps)
258
    holdings = _get_holdings_for_update(holding_keys)
259
    provisions = _partition_by(lambda p: p.serial_id, ps)
260

    
261
    log_datetime = datetime.now()
262

    
263
    accepted, rejected, notFound = [], [], []
264
    for serial, accept in actions.iteritems():
265
        commission = commissions.get(serial)
266
        if commission is None:
267
            notFound.append(serial)
268
            continue
269

    
270
        accepted.append(serial) if accept else rejected.append(serial)
271

    
272
        ps = provisions.get(serial, [])
273
        for pv in ps:
274
            key = pv.holding_key()
275
            h = holdings.get(key)
276
            if h is None:
277
                raise CorruptedError("Corrupted provision")
278

    
279
            quantity = pv.quantity
280
            action = finalize if accept else undo
281
            if quantity >= 0:
282
                action(Import, h, quantity)
283
            else:  # release
284
                action(Release, h, -quantity)
285

    
286
            prefix = 'ACCEPT:' if accept else 'REJECT:'
287
            comm_reason = prefix + reason[-121:]
288
            _log_provision(commission, pv, h, log_datetime, comm_reason)
289
            pv.delete()
290
        commission.delete()
291
    return accepted, rejected, notFound, conflicting
292

    
293

    
294
def resolve_pending_commission(clientkey, serial, accept=True):
295
    if accept:
296
        ok, notOk, notF, confl = resolve_pending_commissions(
297
            clientkey=clientkey, accept_set=[serial])
298
    else:
299
        notOk, ok, notF, confl = resolve_pending_commissions(
300
            clientkey=clientkey, reject_set=[serial])
301

    
302
    assert notOk == confl == []
303
    assert ok + notF == [serial]
304
    return bool(ok)
305

    
306

    
307
def get_pending_commissions(clientkey):
308
    pending = Commission.objects.filter(clientkey=clientkey)
309
    pending_list = pending.values_list('serial', flat=True)
310
    return list(pending_list)
311

    
312

    
313
def get_commission(clientkey, serial):
314
    try:
315
        commission = Commission.objects.get(clientkey=clientkey,
316
                                            serial=serial)
317
    except Commission.DoesNotExist:
318
        raise NoCommissionError(serial)
319

    
320
    objs = Provision.objects
321
    provisions = objs.filter(serial=commission)
322

    
323
    ps = [p.todict() for p in provisions]
324

    
325
    response = {'serial':     serial,
326
                'provisions': ps,
327
                'issue_time': commission.issue_datetime,
328
                'name':       commission.name,
329
                }
330
    return response