Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / quotaholder / callpoint.py @ 2b888e60

History | View | Annotate | Download (20 kB)

1
# Copyright 2012, 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from astakos.quotaholder.exception import (
35
    QuotaholderError,
36
    CorruptedError, InvalidDataError,
37
    NoStockError, NoCapacityError,
38
    DuplicateError)
39

    
40
from astakos.quotaholder.commission import (
41
    Import, Export, Reclaim, Release, Operations)
42

    
43
from astakos.quotaholder.utils.newname import newname
44
from astakos.quotaholder.api import QH_PRACTICALLY_INFINITE
45

    
46
from django.db.models import Q, Count
47
from django.db.models import Q
48
from .models import (Holding,
49
                     Commission, Provision, ProvisionLog,
50
                     now,
51
                     db_get_holding,
52
                     db_get_commission, db_filter_provision)
53

    
54

    
55
class QuotaholderDjangoDBCallpoint(object):
56

    
57
    def _init_holding(self,
58
                      holder, resource, capacity,
59
                      imported_min, imported_max, stock_min, stock_max,
60
                      flags):
61
        try:
62
            h = db_get_holding(holder=holder, resource=resource,
63
                               for_update=True)
64
        except Holding.DoesNotExist:
65
            h = Holding(holder=holder, resource=resource)
66

    
67
        h.capacity = capacity
68
        h.flags = flags
69
        h.imported_min = imported_min
70
        h.imported_max = imported_max
71
        h.stock_min = stock_min
72
        h.stock_max = stock_max
73
        h.save()
74

    
75
    def init_holding(self, context=None, init_holding=[]):
76
        rejected = []
77
        append = rejected.append
78

    
79
        for idx, sfh in enumerate(init_holding):
80
            (holder, resource, capacity,
81
             imported_min, imported_max, stock_min, stock_max,
82
             flags) = sfh
83

    
84
            self._init_holding(holder, resource, capacity,
85
                               imported_min, imported_max,
86
                               stock_min, stock_max,
87
                               flags)
88
        if rejected:
89
            raise QuotaholderError(rejected)
90
        return rejected
91

    
92
    def reset_holding(self, context=None, reset_holding=[]):
93
        rejected = []
94
        append = rejected.append
95

    
96
        for idx, tpl in enumerate(reset_holding):
97
            (holder, resource,
98
             imported_min, imported_max, stock_min, stock_max) = tpl
99

    
100
            try:
101
                h = db_get_holding(holder=holder, resource=resource,
102
                                   for_update=True)
103
                h.imported_min = imported_min
104
                h.imported_max = imported_max
105
                h.stock_min = stock_min
106
                h.stock_max = stock_max
107
                h.save()
108
            except Holding.DoesNotExist:
109
                append(idx)
110
                continue
111

    
112
        if rejected:
113
            raise QuotaholderError(rejected)
114
        return rejected
115

    
116
    def _check_pending(self, holder, resource):
117
        cs = Commission.objects.filter(holder=holder)
118
        cs = [c for c in cs if c.provisions.filter(resource=resource)]
119
        as_target = [c.serial for c in cs]
120

    
121
        ps = Provision.objects.filter(holder=holder, resource=resource)
122
        as_source = [p.serial.serial for p in ps]
123

    
124
        return as_target + as_source
125

    
126
    def release_holding(self, context=None, release_holding=[]):
127
        rejected = []
128
        append = rejected.append
129

    
130
        for idx, (holder, resource) in enumerate(release_holding):
131
            try:
132
                h = db_get_holding(holder=holder, resource=resource,
133
                                   for_update=True)
134
            except Holding.DoesNotExist:
135
                append(idx)
136
                continue
137

    
138
            if self._check_pending(holder, resource):
139
                append(idx)
140
                continue
141

    
142
            if h.imported_max > 0:
143
                append(idx)
144
                continue
145

    
146
            h.delete()
147

    
148
        if rejected:
149
            raise QuotaholderError(rejected)
150
        return rejected
151

    
152
    def list_resources(self, context=None, holder=None):
153
        holdings = Holding.objects.filter(holder=holder)
154
        resources = [h.resource for h in holdings]
155
        return resources
156

    
157
    def list_holdings(self, context=None, list_holdings=[]):
158
        rejected = []
159
        reject = rejected.append
160
        holdings_list = []
161
        append = holdings_list.append
162

    
163
        for holder in list_holdings:
164
            holdings = list(Holding.objects.filter(holder=holder))
165
            if not holdings:
166
                reject(holder)
167
                continue
168

    
169
            append([(holder, h.resource,
170
                     h.imported_min, h.imported_max, h.stock_min, h.stock_max)
171
                    for h in holdings])
172

    
173
        return holdings_list, rejected
174

    
175
    def get_quota(self, context=None, get_quota=[]):
176
        quotas = []
177
        append = quotas.append
178

    
179
        holders = set(holder for holder, r in get_quota)
180
        hs = Holding.objects.filter(holder__in=holders)
181
        holdings = {}
182
        for h in hs:
183
            holdings[(h.holder, h.resource)] = h
184

    
185
        for holder, resource in get_quota:
186
            try:
187
                h = holdings[(holder, resource)]
188
            except:
189
                continue
190

    
191
            append((h.holder, h.resource, h.capacity,
192
                    h.imported_min, h.imported_max,
193
                    h.stock_min, h.stock_max,
194
                    h.flags))
195

    
196
        return quotas
197

    
198
    def set_quota(self, context=None, set_quota=[]):
199
        rejected = []
200
        append = rejected.append
201

    
202
        q_holdings = Q()
203
        holders = []
204
        for (holder, resource, _, _) in set_quota:
205
            holders.append(holder)
206

    
207
        hs = Holding.objects.filter(holder__in=holders).select_for_update()
208
        holdings = {}
209
        for h in hs:
210
            holdings[(h.holder, h.resource)] = h
211

    
212
        for (holder, resource,
213
             capacity,
214
             flags) in set_quota:
215

    
216
            try:
217
                h = holdings[(holder, resource)]
218
                h.flags = flags
219
            except KeyError:
220
                h = Holding(holder=holder, resource=resource,
221
                            flags=flags)
222

    
223
            h.capacity = capacity
224
            h.save()
225
            holdings[(holder, resource)] = h
226

    
227
        if rejected:
228
            raise QuotaholderError(rejected)
229
        return rejected
230

    
231
    def add_quota(self,
232
                  context=None,
233
                  sub_quota=[], add_quota=[]):
234
        rejected = []
235
        append = rejected.append
236

    
237
        sources = sub_quota + add_quota
238
        q_holdings = Q()
239
        holders = []
240
        for (holder, resource, _) in sources:
241
            holders.append(holder)
242

    
243
        hs = Holding.objects.filter(holder__in=holders).select_for_update()
244
        holdings = {}
245
        for h in hs:
246
            holdings[(h.holder, h.resource)] = h
247

    
248
        for removing, source in [(True, sub_quota), (False, add_quota)]:
249
            for (holder, resource,
250
                 capacity,
251
                 ) in source:
252

    
253
                try:
254
                    h = holdings[(holder, resource)]
255
                    current_capacity = h.capacity
256
                except KeyError:
257
                    if removing:
258
                        append((holder, resource))
259
                        continue
260

    
261
                    h = Holding(holder=holder, resource=resource, flags=0)
262
                    current_capacity = 0
263

    
264
                h.capacity = (current_capacity - capacity if removing else
265
                              current_capacity + capacity)
266

    
267
                if h.capacity < 0:
268
                    append((holder, resource))
269
                    continue
270

    
271
                h.save()
272
                holdings[(holder, resource)] = h
273

    
274
        if rejected:
275
            raise QuotaholderError(rejected)
276

    
277
        return rejected
278

    
279
    def issue_commission(self,
280
                         context=None,
281
                         clientkey=None,
282
                         target=None,
283
                         name=None,
284
                         provisions=()):
285

    
286
        create = Commission.objects.create
287
        commission = create(holder=target, clientkey=clientkey, name=name)
288
        serial = commission.serial
289

    
290
        operations = Operations()
291

    
292
        try:
293
            checked = []
294
            for holder, resource, quantity in provisions:
295

    
296
                if holder == target:
297
                    m = ("Cannot issue commission from a holder "
298
                         "to itself (%s)" % (holder,))
299
                    raise InvalidDataError(m)
300

    
301
                ent_res = holder, resource
302
                if ent_res in checked:
303
                    m = "Duplicate provision for %s.%s" % ent_res
304
                    raise DuplicateError(m)
305
                checked.append(ent_res)
306

    
307
                # Source
308
                try:
309
                    h = db_get_holding(holder=holder, resource=resource,
310
                                       for_update=True)
311
                except Holding.DoesNotExist:
312
                    m = ("%s has no stock of %s." % (holder, resource))
313
                    raise NoStockError(m,
314
                                       holder=holder,
315
                                       resource=resource,
316
                                       requested=quantity,
317
                                       current=0,
318
                                       limit=0)
319

    
320
                # Target
321
                try:
322
                    th = db_get_holding(holder=target, resource=resource,
323
                                        for_update=True)
324
                except Holding.DoesNotExist:
325
                    m = ("There is no capacity "
326
                         "to allocate into in %s.%s" % (target, resource))
327
                    raise NoCapacityError(m,
328
                                          holder=holder,
329
                                          resource=resource,
330
                                          requested=quantity,
331
                                          current=0,
332
                                          limit=0)
333

    
334
                if quantity >= 0:
335
                    operations.prepare(Export, h, quantity)
336
                    operations.prepare(Import, th, quantity)
337

    
338
                else: # release
339
                    abs_quantity = -quantity
340

    
341
                    operations.prepare(Reclaim, h, abs_quantity)
342
                    operations.prepare(Release, th, abs_quantity)
343

    
344
                Provision.objects.create(serial=commission,
345
                                         holder=holder,
346
                                         resource=resource,
347
                                         quantity=quantity)
348

    
349
        except QuotaholderError:
350
            operations.revert()
351
            raise
352

    
353
        return serial
354

    
355
    def _log_provision(self,
356
                       commission, s_holding, t_holding,
357
                       provision, log_time, reason):
358

    
359
        s_holder = s_holding.holder
360
        t_holder = t_holding.holder
361

    
362
        kwargs = {
363
            'serial':              commission.serial,
364
            'name':                commission.name,
365
            'source':              s_holder,
366
            'target':              t_holder,
367
            'resource':            provision.resource,
368
            'source_capacity':     s_holding.capacity,
369
            'source_imported_min': s_holding.imported_min,
370
            'source_imported_max': s_holding.imported_max,
371
            'source_stock_min':    s_holding.stock_min,
372
            'source_stock_max':    s_holding.stock_max,
373
            'target_capacity':     t_holding.capacity,
374
            'target_imported_min': t_holding.imported_min,
375
            'target_imported_max': t_holding.imported_max,
376
            'target_stock_min':    t_holding.stock_min,
377
            'target_stock_max':    t_holding.stock_max,
378
            'delta_quantity':      provision.quantity,
379
            'issue_time':          commission.issue_time,
380
            'log_time':            log_time,
381
            'reason':              reason,
382
        }
383

    
384
        ProvisionLog.objects.create(**kwargs)
385

    
386
    def accept_commission(self,
387
                          context=None, clientkey=None,
388
                          serials=[], reason=''):
389
        log_time = now()
390

    
391
        for serial in serials:
392
            try:
393
                c = db_get_commission(clientkey=clientkey, serial=serial,
394
                                      for_update=True)
395
            except Commission.DoesNotExist:
396
                return
397

    
398
            t = c.holder
399

    
400
            operations = Operations()
401

    
402
            provisions = db_filter_provision(serial=serial, for_update=True)
403
            for pv in provisions:
404
                try:
405
                    h = db_get_holding(holder=pv.holder,
406
                                       resource=pv.resource, for_update=True)
407
                    th = db_get_holding(holder=t, resource=pv.resource,
408
                                        for_update=True)
409
                except Holding.DoesNotExist:
410
                    m = "Corrupted provision"
411
                    raise CorruptedError(m)
412

    
413
                quantity = pv.quantity
414

    
415
                if quantity >= 0:
416
                    operations.finalize(Export, h, quantity)
417
                    operations.finalize(Import, th, quantity)
418
                else: # release
419
                    abs_quantity = -quantity
420

    
421
                    operations.finalize(Reclaim, h, abs_quantity)
422
                    operations.finalize(Release, th, abs_quantity)
423

    
424
                reason = 'ACCEPT:' + reason[-121:]
425
                self._log_provision(c, h, th, pv, log_time, reason)
426
                pv.delete()
427
            c.delete()
428

    
429
        return
430

    
431
    def reject_commission(self,
432
                          context=None, clientkey=None,
433
                          serials=[], reason=''):
434
        log_time = now()
435

    
436
        for serial in serials:
437
            try:
438
                c = db_get_commission(clientkey=clientkey, serial=serial,
439
                                      for_update=True)
440
            except Commission.DoesNotExist:
441
                return
442

    
443
            t = c.holder
444

    
445
            operations = Operations()
446

    
447
            provisions = db_filter_provision(serial=serial, for_update=True)
448
            for pv in provisions:
449
                try:
450
                    h = db_get_holding(holder=pv.holder,
451
                                       resource=pv.resource, for_update=True)
452
                    th = db_get_holding(holder=t, resource=pv.resource,
453
                                        for_update=True)
454
                except Holding.DoesNotExist:
455
                    m = "Corrupted provision"
456
                    raise CorruptedError(m)
457

    
458
                quantity = pv.quantity
459

    
460
                if quantity >= 0:
461
                    operations.undo(Export, h, quantity)
462
                    operations.undo(Import, th, quantity)
463
                else: # release
464
                    abs_quantity = -quantity
465

    
466
                    operations.undo(Reclaim, h, abs_quantity)
467
                    operations.undo(Release, th, abs_quantity)
468

    
469
                reason = 'REJECT:' + reason[-121:]
470
                self._log_provision(c, h, th, pv, log_time, reason)
471
                pv.delete()
472
            c.delete()
473

    
474
        return
475

    
476
    def get_pending_commissions(self, context=None, clientkey=None):
477
        pending = Commission.objects.filter(clientkey=clientkey)
478
        pending_list = pending.values_list('serial', flat=True)
479
        return pending_list
480

    
481
    def resolve_pending_commissions(self,
482
                                    context=None, clientkey=None,
483
                                    max_serial=None, accept_set=[]):
484
        accept_set = set(accept_set)
485
        pending = self.get_pending_commissions(context=context,
486
                                               clientkey=clientkey)
487
        pending = sorted(pending)
488

    
489
        accept = self.accept_commission
490
        reject = self.reject_commission
491

    
492
        for serial in pending:
493
            if serial > max_serial:
494
                break
495

    
496
            if serial in accept_set:
497
                accept(context=context, clientkey=clientkey, serials=[serial])
498
            else:
499
                reject(context=context, clientkey=clientkey, serials=[serial])
500

    
501
        return
502

    
503
    def get_timeline(self, context=None, after="", before="Z", get_timeline=[]):
504
        holder_set = set()
505
        e_add = holder_set.add
506
        resource_set = set()
507
        r_add = resource_set.add
508

    
509
        for holder, resource in get_timeline:
510
            if holder not in holder_set:
511
                e_add(holder)
512

    
513
            r_add((holder, resource))
514

    
515
        chunk_size = 65536
516
        nr = 0
517
        timeline = []
518
        append = timeline.append
519
        filterlogs = ProvisionLog.objects.filter
520
        if holder_set:
521
            q_holder = Q(source__in=holder_set) | Q(target__in=holder_set)
522
        else:
523
            q_holder = Q()
524

    
525
        while 1:
526
            logs = filterlogs(q_holder,
527
                              issue_time__gt=after,
528
                              issue_time__lte=before,
529
                              reason__startswith='ACCEPT:')
530

    
531
            logs = logs.order_by('issue_time')
532
            #logs = logs.values()
533
            logs = logs[:chunk_size]
534
            nr += len(logs)
535
            if not logs:
536
                break
537
            for g in logs:
538
                if ((g.source, g.resource) not in resource_set
539
                    or (g.target, g.resource) not in resource_set):
540
                    continue
541

    
542
                o = {
543
                    'serial':                   g.serial,
544
                    'source':                   g.source,
545
                    'target':                   g.target,
546
                    'resource':                 g.resource,
547
                    'name':                     g.name,
548
                    'quantity':                 g.delta_quantity,
549
                    'source_allocated':         g.source_allocated(),
550
                    'source_allocated_through': g.source_allocated_through(),
551
                    'source_inbound':           g.source_inbound(),
552
                    'source_inbound_through':   g.source_inbound_through(),
553
                    'source_outbound':          g.source_outbound(),
554
                    'source_outbound_through':  g.source_outbound_through(),
555
                    'target_allocated':         g.target_allocated(),
556
                    'target_allocated_through': g.target_allocated_through(),
557
                    'target_inbound':           g.target_inbound(),
558
                    'target_inbound_through':   g.target_inbound_through(),
559
                    'target_outbound':          g.target_outbound(),
560
                    'target_outbound_through':  g.target_outbound_through(),
561
                    'issue_time':               g.issue_time,
562
                    'log_time':                 g.log_time,
563
                    'reason':                   g.reason,
564
                }
565

    
566
                append(o)
567

    
568
            after = g.issue_time
569
            if after >= before:
570
                break
571

    
572
        return timeline
573

    
574

    
575
API_Callpoint = QuotaholderDjangoDBCallpoint