Revision 948e15bc snf-astakos-app/astakos/quotaholder/callpoint.py

b/snf-astakos-app/astakos/quotaholder/callpoint.py
50 50
                     now)
51 51

  
52 52

  
53
class QuotaholderDjangoDBCallpoint(object):
53
def get_quota(holders=None, sources=None, resources=None):
54
    holdings = Holding.objects.all()
54 55

  
55
    def get_quota(self, holders=None, sources=None, resources=None):
56
        holdings = Holding.objects.all()
56
    if holders is not None:
57
        holdings = holdings.filter(holder__in=holders)
57 58

  
58
        if holders is not None:
59
            holdings = holdings.filter(holder__in=holders)
59
    if sources is not None:
60
        holdings = holdings.filter(source__in=sources)
60 61

  
61
        if sources is not None:
62
            holdings = holdings.filter(source__in=sources)
62
    if resources is not None:
63
        holdings = holdings.filter(resource__in=resources)
63 64

  
64
        if resources is not None:
65
            holdings = holdings.filter(resource__in=resources)
65
    quotas = {}
66
    for holding in holdings:
67
        key = (holding.holder, holding.source, holding.resource)
68
        value = (holding.limit, holding.imported_min, holding.imported_max)
69
        quotas[key] = value
66 70

  
67
        quotas = {}
68
        for holding in holdings:
69
            key = (holding.holder, holding.source, holding.resource)
70
            value = (holding.limit, holding.imported_min, holding.imported_max)
71
            quotas[key] = value
71
    return quotas
72 72

  
73
        return quotas
74 73

  
75
    def _get_holdings_for_update(self, holding_keys):
76
        holding_keys = sorted(holding_keys)
77
        holdings = {}
78
        for (holder, source, resource) in holding_keys:
79
            try:
80
                h = Holding.objects.get_for_update(
81
                    holder=holder, source=source, resource=resource)
82
                holdings[(holder, source, resource)] = h
83
            except Holding.DoesNotExist:
84
                pass
85
        return holdings
86

  
87
    def _provisions_to_list(self, provisions):
88
        lst = []
89
        for provision in provisions:
90
            try:
91
                holder = provision['holder']
92
                source = provision['source']
93
                resource = provision['resource']
94
                quantity = provision['quantity']
95
                key = (holder, source, resource)
96
                lst.append((key, quantity))
97
            except KeyError:
98
                raise InvalidDataError("Malformed provision")
99
        return lst
100

  
101
    def _mkProvision(self, key, quantity):
102
        holder, source, resource = key
103
        return {'holder': holder,
104
                'source': source,
105
                'resource': resource,
106
                'quantity': quantity,
107
                }
108

  
109
    def set_quota(self, quotas):
110
        holding_keys = [key for (key, limit) in quotas]
111
        holdings = self._get_holdings_for_update(holding_keys)
112

  
113
        for key, limit in quotas:
114
            try:
115
                h = holdings[key]
116
            except KeyError:
117
                holder, source, resource = key
118
                h = Holding(holder=holder,
119
                            source=source,
120
                            resource=resource)
121
            h.limit = limit
122
            h.save()
123
            holdings[key] = h
124

  
125
    def add_resource_limit(self, holders=None, sources=None, resources=None,
126
                           diff=0):
127
        holdings = Holding.objects.all()
128

  
129
        if holders is not None:
130
            holdings = holdings.filter(holder__in=holders)
74
def _get_holdings_for_update(holding_keys):
75
    holding_keys = sorted(holding_keys)
76
    holdings = {}
77
    for (holder, source, resource) in holding_keys:
78
        try:
79
            h = Holding.objects.get_for_update(
80
                holder=holder, source=source, resource=resource)
81
            holdings[(holder, source, resource)] = h
82
        except Holding.DoesNotExist:
83
            pass
84
    return holdings
131 85

  
132
        if sources is not None:
133
            holdings = holdings.filter(source__in=sources)
134 86

  
135
        if resources is not None:
136
            holdings = holdings.filter(resource__in=resources)
87
def _provisions_to_list(provisions):
88
    lst = []
89
    for provision in provisions:
90
        try:
91
            holder = provision['holder']
92
            source = provision['source']
93
            resource = provision['resource']
94
            quantity = provision['quantity']
95
            key = (holder, source, resource)
96
            lst.append((key, quantity))
97
        except KeyError:
98
            raise InvalidDataError("Malformed provision")
99
    return lst
100

  
101

  
102
def _mkProvision(key, quantity):
103
    holder, source, resource = key
104
    return {'holder': holder,
105
            'source': source,
106
            'resource': resource,
107
            'quantity': quantity,
108
            }
109

  
110

  
111
def set_quota(quotas):
112
    holding_keys = [key for (key, limit) in quotas]
113
    holdings = _get_holdings_for_update(holding_keys)
114

  
115
    for key, limit in quotas:
116
        try:
117
            h = holdings[key]
118
        except KeyError:
119
            holder, source, resource = key
120
            h = Holding(holder=holder,
121
                        source=source,
122
                        resource=resource)
123
        h.limit = limit
124
        h.save()
125
        holdings[key] = h
137 126

  
138
        holdings.update(limit=F('limit')+diff)
139 127

  
140
    def issue_commission(self,
141
                         context=None,
142
                         clientkey=None,
143
                         name=None,
144
                         force=False,
145
                         provisions=()):
128
def add_resource_limit(holders=None, sources=None, resources=None, diff=0):
129
    holdings = Holding.objects.all()
146 130

  
147
        if name is None:
148
            name = ""
131
    if holders is not None:
132
        holdings = holdings.filter(holder__in=holders)
149 133

  
150
        operations = Operations()
151
        provisions_to_create = []
134
    if sources is not None:
135
        holdings = holdings.filter(source__in=sources)
152 136

  
153
        provisions = self._provisions_to_list(provisions)
154
        keys = [key for (key, value) in provisions]
155
        holdings = self._get_holdings_for_update(keys)
156
        try:
157
            checked = []
158
            for key, quantity in provisions:
159
                if not isinstance(quantity, (int, long)):
160
                    raise InvalidDataError("Malformed provision")
161

  
162
                if key in checked:
163
                    m = "Duplicate provision for %s" % str(key)
164
                    provision = self._mkProvision(key, quantity)
165
                    raise DuplicateError(m,
166
                                         provision=provision)
167
                checked.append(key)
168

  
169
                # Target
170
                try:
171
                    th = holdings[key]
172
                except KeyError:
173
                    m = ("There is no such holding %s" % str(key))
174
                    provision = self._mkProvision(key, quantity)
175
                    raise NoHoldingError(m,
176
                                         provision=provision)
177

  
178
                if quantity >= 0:
179
                    operations.prepare(Import, th, quantity, force)
180

  
181
                else: # release
182
                    abs_quantity = -quantity
183
                    operations.prepare(Release, th, abs_quantity, force)
184

  
185
                holdings[key] = th
186
                provisions_to_create.append((key, quantity))
187

  
188
        except QuotaholderError:
189
            operations.revert()
190
            raise
191

  
192
        commission = Commission.objects.create(clientkey=clientkey, name=name)
193
        for (holder, source, resource), quantity in provisions_to_create:
194
            Provision.objects.create(serial=commission,
195
                                     holder=holder,
196
                                     source=source,
197
                                     resource=resource,
198
                                     quantity=quantity)
199

  
200
        return commission.serial
201

  
202
    def _log_provision(self,
203
                       commission, provision, holding, log_time, reason):
204

  
205
        kwargs = {
206
            'serial':              commission.serial,
207
            'name':                commission.name,
208
            'holder':              holding.holder,
209
            'source':              holding.source,
210
            'resource':            holding.resource,
211
            'limit':               holding.limit,
212
            'imported_min':        holding.imported_min,
213
            'imported_max':        holding.imported_max,
214
            'delta_quantity':      provision.quantity,
215
            'issue_time':          commission.issue_time,
216
            'log_time':            log_time,
217
            'reason':              reason,
218
        }
219

  
220
        ProvisionLog.objects.create(**kwargs)
221

  
222
    def _get_commissions_for_update(self, clientkey, serials):
223
        cs = Commission.objects.filter(
224
            clientkey=clientkey, serial__in=serials).select_for_update()
225

  
226
        commissions = {}
227
        for c in cs:
228
            commissions[c.serial] = c
229
        return commissions
230

  
231
    def _partition_by(self, f, l):
232
        d = {}
233
        for x in l:
234
            group = f(x)
235
            group_l = d.get(group, [])
236
            group_l.append(x)
237
            d[group] = group_l
238
        return d
239

  
240
    def resolve_pending_commissions(self,
241
                                    context=None, clientkey=None,
242
                                    accept_set=[], reject_set=[],
243
                                    reason=''):
244
        actions = dict.fromkeys(accept_set, True)
245
        conflicting = set()
246
        for serial in reject_set:
247
            if actions.get(serial) is True:
248
                actions.pop(serial)
249
                conflicting.add(serial)
250
            else:
251
                actions[serial] = False
252

  
253
        conflicting = list(conflicting)
254
        serials = actions.keys()
255
        commissions = self._get_commissions_for_update(clientkey, serials)
256
        ps = Provision.objects.filter(serial__in=serials).select_for_update()
257
        holding_keys = sorted(p.holding_key() for p in ps)
258
        holdings = self._get_holdings_for_update(holding_keys)
259
        provisions = self._partition_by(lambda p: p.serial_id, ps)
260

  
261
        log_time = now()
262

  
263
        accepted, rejected, notFound = [], [], []
264
        for serial, accept in actions.iteritems():
265
            commission = commissions.get(serial)
266
            if commission is None:
267
                notFound.append(serial)
268
                continue
269

  
270
            accepted.append(serial) if accept else rejected.append(serial)
271

  
272
            ps = provisions.get(serial)
273
            assert ps is not None
274
            for pv in ps:
275
                key = pv.holding_key()
276
                h = holdings.get(key)
277
                if h is None:
278
                    raise CorruptedError("Corrupted provision")
279

  
280
                quantity = pv.quantity
281
                action = finalize if accept else undo
282
                if quantity >= 0:
283
                    action(Import, h, quantity)
284
                else:  # release
285
                    action(Release, h, -quantity)
286

  
287
                prefix = 'ACCEPT:' if accept else 'REJECT:'
288
                comm_reason = prefix + reason[-121:]
289
                self._log_provision(commission, pv, h, log_time, comm_reason)
290
                pv.delete()
291
            commission.delete()
292
        return accepted, rejected, notFound, conflicting
293

  
294
    def resolve_pending_commission(self, clientkey, serial, accept=True):
295
        if accept:
296
            ok, notOk, notF, confl = self.resolve_pending_commissions(
297
                clientkey=clientkey, accept_set=[serial])
298
        else:
299
            notOk, ok, notF, confl = self.resolve_pending_commissions(
300
                clientkey=clientkey, reject_set=[serial])
137
    if resources is not None:
138
        holdings = holdings.filter(resource__in=resources)
301 139

  
302
        assert notOk == confl == []
303
        assert ok + notF == [serial]
304
        return bool(ok)
140
    holdings.update(limit=F('limit')+diff)
305 141

  
306
    def get_pending_commissions(self, context=None, clientkey=None):
307
        pending = Commission.objects.filter(clientkey=clientkey)
308
        pending_list = pending.values_list('serial', flat=True)
309
        return list(pending_list)
310 142

  
311
    def get_commission(self, clientkey=None, serial=None):
312
        try:
313
            commission = Commission.objects.get(clientkey=clientkey,
314
                                                serial=serial)
315
        except Commission.DoesNotExist:
316
            raise NoCommissionError(serial)
143
def issue_commission(context=None,
144
                     clientkey=None,
145
                     name=None,
146
                     force=False,
147
                     provisions=()):
317 148

  
318
        objs = Provision.objects.select_related('holding')
319
        provisions = objs.filter(serial=commission)
149
    if name is None:
150
        name = ""
320 151

  
321
        ps = [p.todict() for p in provisions]
152
    operations = Operations()
153
    provisions_to_create = []
322 154

  
323
        response = {'serial':     serial,
324
                    'provisions': ps,
325
                    'issue_time': commission.issue_time,
326
                    }
327
        return response
155
    provisions = _provisions_to_list(provisions)
156
    keys = [key for (key, value) in provisions]
157
    holdings = _get_holdings_for_update(keys)
158
    try:
159
        checked = []
160
        for key, quantity in provisions:
161
            if not isinstance(quantity, (int, long)):
162
                raise InvalidDataError("Malformed provision")
328 163

  
164
            if key in checked:
165
                m = "Duplicate provision for %s" % str(key)
166
                provision = _mkProvision(key, quantity)
167
                raise DuplicateError(m,
168
                                     provision=provision)
169
            checked.append(key)
329 170

  
330
API_Callpoint = QuotaholderDjangoDBCallpoint
171
            # Target
172
            try:
173
                th = holdings[key]
174
            except KeyError:
175
                m = ("There is no such holding %s" % str(key))
176
                provision = _mkProvision(key, quantity)
177
                raise NoHoldingError(m,
178
                                     provision=provision)
179

  
180
            if quantity >= 0:
181
                operations.prepare(Import, th, quantity, force)
182

  
183
            else:  # release
184
                abs_quantity = -quantity
185
                operations.prepare(Release, th, abs_quantity, force)
186

  
187
            holdings[key] = th
188
            provisions_to_create.append((key, quantity))
189

  
190
    except QuotaholderError:
191
        operations.revert()
192
        raise
193

  
194
    commission = Commission.objects.create(clientkey=clientkey, name=name)
195
    for (holder, source, resource), quantity in provisions_to_create:
196
        Provision.objects.create(serial=commission,
197
                                 holder=holder,
198
                                 source=source,
199
                                 resource=resource,
200
                                 quantity=quantity)
201

  
202
    return commission.serial
203

  
204

  
205
def _log_provision(commission, provision, holding, log_time, reason):
206

  
207
    kwargs = {
208
        'serial':              commission.serial,
209
        'name':                commission.name,
210
        'holder':              holding.holder,
211
        'source':              holding.source,
212
        'resource':            holding.resource,
213
        'limit':               holding.limit,
214
        'imported_min':        holding.imported_min,
215
        'imported_max':        holding.imported_max,
216
        'delta_quantity':      provision.quantity,
217
        'issue_time':          commission.issue_time,
218
        'log_time':            log_time,
219
        'reason':              reason,
220
    }
221

  
222
    ProvisionLog.objects.create(**kwargs)
223

  
224

  
225
def _get_commissions_for_update(clientkey, serials):
226
    cs = Commission.objects.filter(
227
        clientkey=clientkey, serial__in=serials).select_for_update()
228

  
229
    commissions = {}
230
    for c in cs:
231
        commissions[c.serial] = c
232
    return commissions
233

  
234

  
235
def _partition_by(f, l):
236
    d = {}
237
    for x in l:
238
        group = f(x)
239
        group_l = d.get(group, [])
240
        group_l.append(x)
241
        d[group] = group_l
242
    return d
243

  
244

  
245
def resolve_pending_commissions(context=None, clientkey=None,
246
                                accept_set=[], reject_set=[],
247
                                reason=''):
248
    actions = dict.fromkeys(accept_set, True)
249
    conflicting = set()
250
    for serial in reject_set:
251
        if actions.get(serial) is True:
252
            actions.pop(serial)
253
            conflicting.add(serial)
254
        else:
255
            actions[serial] = False
256

  
257
    conflicting = list(conflicting)
258
    serials = actions.keys()
259
    commissions = _get_commissions_for_update(clientkey, serials)
260
    ps = Provision.objects.filter(serial__in=serials).select_for_update()
261
    holding_keys = sorted(p.holding_key() for p in ps)
262
    holdings = _get_holdings_for_update(holding_keys)
263
    provisions = _partition_by(lambda p: p.serial_id, ps)
264

  
265
    log_time = now()
266

  
267
    accepted, rejected, notFound = [], [], []
268
    for serial, accept in actions.iteritems():
269
        commission = commissions.get(serial)
270
        if commission is None:
271
            notFound.append(serial)
272
            continue
273

  
274
        accepted.append(serial) if accept else rejected.append(serial)
275

  
276
        ps = provisions.get(serial)
277
        assert ps is not None
278
        for pv in ps:
279
            key = pv.holding_key()
280
            h = holdings.get(key)
281
            if h is None:
282
                raise CorruptedError("Corrupted provision")
283

  
284
            quantity = pv.quantity
285
            action = finalize if accept else undo
286
            if quantity >= 0:
287
                action(Import, h, quantity)
288
            else:  # release
289
                action(Release, h, -quantity)
290

  
291
            prefix = 'ACCEPT:' if accept else 'REJECT:'
292
            comm_reason = prefix + reason[-121:]
293
            _log_provision(commission, pv, h, log_time, comm_reason)
294
            pv.delete()
295
        commission.delete()
296
    return accepted, rejected, notFound, conflicting
297

  
298

  
299
def resolve_pending_commission(clientkey, serial, accept=True):
300
    if accept:
301
        ok, notOk, notF, confl = resolve_pending_commissions(
302
            clientkey=clientkey, accept_set=[serial])
303
    else:
304
        notOk, ok, notF, confl = resolve_pending_commissions(
305
            clientkey=clientkey, reject_set=[serial])
306

  
307
    assert notOk == confl == []
308
    assert ok + notF == [serial]
309
    return bool(ok)
310

  
311

  
312
def get_pending_commissions(context=None, clientkey=None):
313
    pending = Commission.objects.filter(clientkey=clientkey)
314
    pending_list = pending.values_list('serial', flat=True)
315
    return list(pending_list)
316

  
317

  
318
def get_commission(clientkey=None, serial=None):
319
    try:
320
        commission = Commission.objects.get(clientkey=clientkey,
321
                                            serial=serial)
322
    except Commission.DoesNotExist:
323
        raise NoCommissionError(serial)
324

  
325
    objs = Provision.objects.select_related('holding')
326
    provisions = objs.filter(serial=commission)
327

  
328
    ps = [p.todict() for p in provisions]
329

  
330
    response = {'serial':     serial,
331
                'provisions': ps,
332
                'issue_time': commission.issue_time,
333
                }
334
    return response

Also available in: Unified diff