Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / quotaholder / callpoint.py @ d03796c2

History | View | Annotate | Download (23.9 kB)

1
# Copyright 2012, 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from astakos.quotaholder.exception import (
35
    QuotaholderError,
36
    CorruptedError, InvalidDataError,
37
    NoStockError, NoCapacityError,
38
    DuplicateError)
39

    
40
from astakos.quotaholder.commission import (
41
    Import, Export, Reclaim, Release, Operations)
42

    
43
from astakos.quotaholder.utils.newname import newname
44
from astakos.quotaholder.api import QH_PRACTICALLY_INFINITE
45

    
46
from django.db.models import Q, Count
47
from django.db.models import Q
48
from .models import (Policy, Holding,
49
                     Commission, Provision, ProvisionLog,
50
                     now,
51
                     db_get_holding, db_get_policy,
52
                     db_get_commission, db_filter_provision)
53

    
54

    
55
class QuotaholderDjangoDBCallpoint(object):
56

    
57
    def get_limits(self, context=None, get_limits=[]):
58
        limits = []
59
        append = limits.append
60

    
61
        for policy in get_limits:
62
            try:
63
                p = Policy.objects.get(policy=policy)
64
            except Policy.DoesNotExist:
65
                continue
66

    
67
            append((policy, p.capacity))
68

    
69
        return limits
70

    
71
    def set_limits(self, context=None, set_limits=[]):
72

    
73
        for (policy, capacity) in set_limits:
74

    
75
            try:
76
                policy = db_get_policy(policy=policy, for_update=True)
77
            except Policy.DoesNotExist:
78
                Policy.objects.create(policy=policy,
79
                                      capacity=capacity,
80
                                      )
81
            else:
82
                policy.capacity = capacity
83
                policy.save()
84

    
85
        return ()
86

    
87
    def get_holding(self, context=None, get_holding=[]):
88
        holdings = []
89
        append = holdings.append
90

    
91
        for holder, resource in get_holding:
92
            try:
93
                h = Holding.objects.get(holder=holder, resource=resource)
94
            except Holding.DoesNotExist:
95
                continue
96

    
97
            append((h.holder, h.resource, h.policy.policy,
98
                    h.imported_min, h.imported_max,
99
                    h.stock_min, h.stock_max, h.flags))
100

    
101
        return holdings
102

    
103
    def set_holding(self, context=None, set_holding=[]):
104
        rejected = []
105
        append = rejected.append
106

    
107
        for holder, resource, policy, flags in set_holding:
108
            try:
109
                p = Policy.objects.get(policy=policy)
110
            except Policy.DoesNotExist:
111
                append((holder, resource, policy))
112
                continue
113

    
114
            try:
115
                h = db_get_holding(holder=holder, resource=resource,
116
                                   for_update=True)
117
                h.policy = p
118
                h.flags = flags
119
                h.save()
120
            except Holding.DoesNotExist:
121
                h = Holding.objects.create(holder=holder, resource=resource,
122
                                           policy=p, flags=flags)
123

    
124
        if rejected:
125
            raise QuotaholderError(rejected)
126
        return rejected
127

    
128
    def _init_holding(self,
129
                      holder, resource, policy,
130
                      imported_min, imported_max, stock_min, stock_max,
131
                      flags):
132
        try:
133
            h = db_get_holding(holder=holder, resource=resource,
134
                               for_update=True)
135
        except Holding.DoesNotExist:
136
            h = Holding(holder=holder, resource=resource)
137

    
138
        h.policy = policy
139
        h.flags = flags
140
        h.imported_min = imported_min
141
        h.imported_max = imported_max
142
        h.stock_min = stock_min
143
        h.stock_max = stock_max
144
        h.save()
145

    
146
    def init_holding(self, context=None, init_holding=[]):
147
        rejected = []
148
        append = rejected.append
149

    
150
        for idx, sfh in enumerate(init_holding):
151
            (holder, resource, policy,
152
             imported_min, imported_max, stock_min, stock_max,
153
             flags) = sfh
154

    
155
            try:
156
                p = Policy.objects.get(policy=policy)
157
            except Policy.DoesNotExist:
158
                append(idx)
159
                continue
160

    
161
            self._init_holding(holder, resource, p,
162
                               imported_min, imported_max,
163
                               stock_min, stock_max,
164
                               flags)
165
        if rejected:
166
            raise QuotaholderError(rejected)
167
        return rejected
168

    
169
    def reset_holding(self, context=None, reset_holding=[]):
170
        rejected = []
171
        append = rejected.append
172

    
173
        for idx, tpl in enumerate(reset_holding):
174
            (holder, resource,
175
             imported_min, imported_max, stock_min, stock_max) = tpl
176

    
177
            try:
178
                h = db_get_holding(holder=holder, resource=resource,
179
                                   for_update=True)
180
                h.imported_min = imported_min
181
                h.imported_max = imported_max
182
                h.stock_min = stock_min
183
                h.stock_max = stock_max
184
                h.save()
185
            except Holding.DoesNotExist:
186
                append(idx)
187
                continue
188

    
189
        if rejected:
190
            raise QuotaholderError(rejected)
191
        return rejected
192

    
193
    def _check_pending(self, holder, resource):
194
        cs = Commission.objects.filter(holder=holder)
195
        cs = [c for c in cs if c.provisions.filter(resource=resource)]
196
        as_target = [c.serial for c in cs]
197

    
198
        ps = Provision.objects.filter(holder=holder, resource=resource)
199
        as_source = [p.serial.serial for p in ps]
200

    
201
        return as_target + as_source
202

    
203
    def release_holding(self, context=None, release_holding=[]):
204
        rejected = []
205
        append = rejected.append
206

    
207
        for idx, (holder, resource) in enumerate(release_holding):
208
            try:
209
                h = db_get_holding(holder=holder, resource=resource,
210
                                   for_update=True)
211
            except Holding.DoesNotExist:
212
                append(idx)
213
                continue
214

    
215
            if self._check_pending(holder, resource):
216
                append(idx)
217
                continue
218

    
219
            if h.imported_max > 0:
220
                append(idx)
221
                continue
222

    
223
            h.delete()
224

    
225
        if rejected:
226
            raise QuotaholderError(rejected)
227
        return rejected
228

    
229
    def list_resources(self, context=None, holder=None):
230
        holdings = Holding.objects.filter(holder=holder)
231
        resources = [h.resource for h in holdings]
232
        return resources
233

    
234
    def list_holdings(self, context=None, list_holdings=[]):
235
        rejected = []
236
        reject = rejected.append
237
        holdings_list = []
238
        append = holdings_list.append
239

    
240
        for holder in list_holdings:
241
            holdings = list(Holding.objects.filter(holder=holder))
242
            if not holdings:
243
                reject(holder)
244
                continue
245

    
246
            append([(holder, h.resource,
247
                     h.imported_min, h.imported_max, h.stock_min, h.stock_max)
248
                    for h in holdings])
249

    
250
        return holdings_list, rejected
251

    
252
    def get_quota(self, context=None, get_quota=[]):
253
        quotas = []
254
        append = quotas.append
255

    
256
        holders = set(holder for holder, r in get_quota)
257
        hs = Holding.objects.select_related().filter(holder__in=holders)
258
        holdings = {}
259
        for h in hs:
260
            holdings[(h.holder, h.resource)] = h
261

    
262
        for holder, resource in get_quota:
263
            try:
264
                h = holdings[(holder, resource)]
265
            except:
266
                continue
267

    
268
            p = h.policy
269

    
270
            append((h.holder, h.resource, p.capacity,
271
                    h.imported_min, h.imported_max,
272
                    h.stock_min, h.stock_max,
273
                    h.flags))
274

    
275
        return quotas
276

    
277
    def set_quota(self, context=None, set_quota=[]):
278
        rejected = []
279
        append = rejected.append
280

    
281
        q_holdings = Q()
282
        holders = []
283
        for (holder, resource, _, _) in set_quota:
284
            holders.append(holder)
285

    
286
        hs = Holding.objects.filter(holder__in=holders).select_for_update()
287
        holdings = {}
288
        for h in hs:
289
            holdings[(h.holder, h.resource)] = h
290

    
291
        old_policies = []
292

    
293
        for (holder, resource,
294
             capacity,
295
             flags) in set_quota:
296

    
297
            policy = newname('policy_')
298
            newp = Policy(policy=policy,
299
                          capacity=capacity,
300
                          )
301

    
302
            try:
303
                h = holdings[(holder, resource)]
304
                old_policies.append(h.policy_id)
305
                h.policy = newp
306
                h.flags = flags
307
            except KeyError:
308
                h = Holding(holder=holder, resource=resource,
309
                            policy=newp, flags=flags)
310

    
311
            # the order is intentionally reversed so that it
312
            # would break if we are not within a transaction.
313
            # Has helped before.
314
            h.save()
315
            newp.save()
316
            holdings[(holder, resource)] = h
317

    
318
        objs = Policy.objects.annotate(refs=Count('holding'))
319
        objs.filter(policy__in=old_policies, refs=0).delete()
320

    
321
        if rejected:
322
            raise QuotaholderError(rejected)
323
        return rejected
324

    
325
    def add_quota(self,
326
                  context=None,
327
                  sub_quota=[], add_quota=[]):
328
        rejected = []
329
        append = rejected.append
330

    
331
        sources = sub_quota + add_quota
332
        q_holdings = Q()
333
        holders = []
334
        for (holder, resource, _) in sources:
335
            holders.append(holder)
336

    
337
        hs = Holding.objects.filter(holder__in=holders).select_for_update()
338
        holdings = {}
339
        for h in hs:
340
            holdings[(h.holder, h.resource)] = h
341

    
342
        pids = [h.policy_id for h in hs]
343
        policies = Policy.objects.in_bulk(pids)
344

    
345
        old_policies = []
346

    
347
        for removing, source in [(True, sub_quota), (False, add_quota)]:
348
            for (holder, resource,
349
                 capacity,
350
                 ) in source:
351

    
352
                try:
353
                    h = holdings[(holder, resource)]
354
                    old_policies.append(h.policy_id)
355
                    try:
356
                        p = policies[h.policy_id]
357
                    except KeyError:
358
                        raise AssertionError("no policy %s" % h.policy_id)
359
                except KeyError:
360
                    if removing:
361
                        append((holder, resource))
362
                        continue
363

    
364
                    h = Holding(holder=holder, resource=resource, flags=0)
365
                    p = None
366

    
367
                policy = newname('policy_')
368
                newp = Policy(policy=policy)
369

    
370
                newp.capacity = _add(p.capacity if p else 0, capacity,
371
                                     invert=removing)
372

    
373
                if _isneg(newp.capacity):
374
                    append((holder, resource))
375
                    continue
376

    
377
                h.policy = newp
378

    
379
                # the order is intentionally reversed so that it
380
                # would break if we are not within a transaction.
381
                # Has helped before.
382
                h.save()
383
                newp.save()
384
                policies[policy] = newp
385
                holdings[(holder, resource)] = h
386

    
387
        objs = Policy.objects.annotate(refs=Count('holding'))
388
        objs.filter(policy__in=old_policies, refs=0).delete()
389

    
390
        if rejected:
391
            raise QuotaholderError(rejected)
392

    
393
        return rejected
394

    
395
    def issue_commission(self,
396
                         context=None,
397
                         clientkey=None,
398
                         target=None,
399
                         name=None,
400
                         provisions=()):
401

    
402
        create = Commission.objects.create
403
        commission = create(holder=target, clientkey=clientkey, name=name)
404
        serial = commission.serial
405

    
406
        operations = Operations()
407

    
408
        try:
409
            checked = []
410
            for holder, resource, quantity in provisions:
411

    
412
                if holder == target:
413
                    m = ("Cannot issue commission from a holder "
414
                         "to itself (%s)" % (holder,))
415
                    raise InvalidDataError(m)
416

    
417
                ent_res = holder, resource
418
                if ent_res in checked:
419
                    m = "Duplicate provision for %s.%s" % ent_res
420
                    raise DuplicateError(m)
421
                checked.append(ent_res)
422

    
423
                # Source
424
                try:
425
                    h = db_get_holding(holder=holder, resource=resource,
426
                                       for_update=True)
427
                except Holding.DoesNotExist:
428
                    m = ("%s has no stock of %s." % (holder, resource))
429
                    raise NoStockError(m,
430
                                       holder=holder,
431
                                       resource=resource,
432
                                       requested=quantity,
433
                                       current=0,
434
                                       limit=0)
435

    
436
                # Target
437
                try:
438
                    th = db_get_holding(holder=target, resource=resource,
439
                                        for_update=True)
440
                except Holding.DoesNotExist:
441
                    m = ("There is no capacity "
442
                         "to allocate into in %s.%s" % (target, resource))
443
                    raise NoCapacityError(m,
444
                                          holder=holder,
445
                                          resource=resource,
446
                                          requested=quantity,
447
                                          current=0,
448
                                          limit=0)
449

    
450
                if quantity >= 0:
451
                    operations.prepare(Export, h, quantity)
452
                    operations.prepare(Import, th, quantity)
453

    
454
                else: # release
455
                    abs_quantity = -quantity
456

    
457
                    operations.prepare(Reclaim, h, abs_quantity)
458
                    operations.prepare(Release, th, abs_quantity)
459

    
460
                Provision.objects.create(serial=commission,
461
                                         holder=holder,
462
                                         resource=resource,
463
                                         quantity=quantity)
464

    
465
        except QuotaholderError:
466
            operations.revert()
467
            raise
468

    
469
        return serial
470

    
471
    def _log_provision(self,
472
                       commission, s_holding, t_holding,
473
                       provision, log_time, reason):
474

    
475
        s_holder = s_holding.holder
476
        s_policy = s_holding.policy
477
        t_holder = t_holding.holder
478
        t_policy = t_holding.policy
479

    
480
        kwargs = {
481
            'serial':              commission.serial,
482
            'name':                commission.name,
483
            'source':              s_holder,
484
            'target':              t_holder,
485
            'resource':            provision.resource,
486
            'source_capacity':     s_policy.capacity,
487
            'source_imported_min': s_holding.imported_min,
488
            'source_imported_max': s_holding.imported_max,
489
            'source_stock_min':    s_holding.stock_min,
490
            'source_stock_max':    s_holding.stock_max,
491
            'target_capacity':     t_policy.capacity,
492
            'target_imported_min': t_holding.imported_min,
493
            'target_imported_max': t_holding.imported_max,
494
            'target_stock_min':    t_holding.stock_min,
495
            'target_stock_max':    t_holding.stock_max,
496
            'delta_quantity':      provision.quantity,
497
            'issue_time':          commission.issue_time,
498
            'log_time':            log_time,
499
            'reason':              reason,
500
        }
501

    
502
        ProvisionLog.objects.create(**kwargs)
503

    
504
    def accept_commission(self,
505
                          context=None, clientkey=None,
506
                          serials=[], reason=''):
507
        log_time = now()
508

    
509
        for serial in serials:
510
            try:
511
                c = db_get_commission(clientkey=clientkey, serial=serial,
512
                                      for_update=True)
513
            except Commission.DoesNotExist:
514
                return
515

    
516
            t = c.holder
517

    
518
            operations = Operations()
519

    
520
            provisions = db_filter_provision(serial=serial, for_update=True)
521
            for pv in provisions:
522
                try:
523
                    h = db_get_holding(holder=pv.holder,
524
                                       resource=pv.resource, for_update=True)
525
                    th = db_get_holding(holder=t, resource=pv.resource,
526
                                        for_update=True)
527
                except Holding.DoesNotExist:
528
                    m = "Corrupted provision"
529
                    raise CorruptedError(m)
530

    
531
                quantity = pv.quantity
532

    
533
                if quantity >= 0:
534
                    operations.finalize(Export, h, quantity)
535
                    operations.finalize(Import, th, quantity)
536
                else: # release
537
                    abs_quantity = -quantity
538

    
539
                    operations.finalize(Reclaim, h, abs_quantity)
540
                    operations.finalize(Release, th, abs_quantity)
541

    
542
                reason = 'ACCEPT:' + reason[-121:]
543
                self._log_provision(c, h, th, pv, log_time, reason)
544
                pv.delete()
545
            c.delete()
546

    
547
        return
548

    
549
    def reject_commission(self,
550
                          context=None, clientkey=None,
551
                          serials=[], reason=''):
552
        log_time = now()
553

    
554
        for serial in serials:
555
            try:
556
                c = db_get_commission(clientkey=clientkey, serial=serial,
557
                                      for_update=True)
558
            except Commission.DoesNotExist:
559
                return
560

    
561
            t = c.holder
562

    
563
            operations = Operations()
564

    
565
            provisions = db_filter_provision(serial=serial, for_update=True)
566
            for pv in provisions:
567
                try:
568
                    h = db_get_holding(holder=pv.holder,
569
                                       resource=pv.resource, for_update=True)
570
                    th = db_get_holding(holder=t, resource=pv.resource,
571
                                        for_update=True)
572
                except Holding.DoesNotExist:
573
                    m = "Corrupted provision"
574
                    raise CorruptedError(m)
575

    
576
                quantity = pv.quantity
577

    
578
                if quantity >= 0:
579
                    operations.undo(Export, h, quantity)
580
                    operations.undo(Import, th, quantity)
581
                else: # release
582
                    abs_quantity = -quantity
583

    
584
                    operations.undo(Reclaim, h, abs_quantity)
585
                    operations.undo(Release, th, abs_quantity)
586

    
587
                reason = 'REJECT:' + reason[-121:]
588
                self._log_provision(c, h, th, pv, log_time, reason)
589
                pv.delete()
590
            c.delete()
591

    
592
        return
593

    
594
    def get_pending_commissions(self, context=None, clientkey=None):
595
        pending = Commission.objects.filter(clientkey=clientkey)
596
        pending_list = pending.values_list('serial', flat=True)
597
        return pending_list
598

    
599
    def resolve_pending_commissions(self,
600
                                    context=None, clientkey=None,
601
                                    max_serial=None, accept_set=[]):
602
        accept_set = set(accept_set)
603
        pending = self.get_pending_commissions(context=context,
604
                                               clientkey=clientkey)
605
        pending = sorted(pending)
606

    
607
        accept = self.accept_commission
608
        reject = self.reject_commission
609

    
610
        for serial in pending:
611
            if serial > max_serial:
612
                break
613

    
614
            if serial in accept_set:
615
                accept(context=context, clientkey=clientkey, serials=[serial])
616
            else:
617
                reject(context=context, clientkey=clientkey, serials=[serial])
618

    
619
        return
620

    
621
    def get_timeline(self, context=None, after="", before="Z", get_timeline=[]):
622
        holder_set = set()
623
        e_add = holder_set.add
624
        resource_set = set()
625
        r_add = resource_set.add
626

    
627
        for holder, resource in get_timeline:
628
            if holder not in holder_set:
629
                e_add(holder)
630

    
631
            r_add((holder, resource))
632

    
633
        chunk_size = 65536
634
        nr = 0
635
        timeline = []
636
        append = timeline.append
637
        filterlogs = ProvisionLog.objects.filter
638
        if holder_set:
639
            q_holder = Q(source__in=holder_set) | Q(target__in=holder_set)
640
        else:
641
            q_holder = Q()
642

    
643
        while 1:
644
            logs = filterlogs(q_holder,
645
                              issue_time__gt=after,
646
                              issue_time__lte=before,
647
                              reason__startswith='ACCEPT:')
648

    
649
            logs = logs.order_by('issue_time')
650
            #logs = logs.values()
651
            logs = logs[:chunk_size]
652
            nr += len(logs)
653
            if not logs:
654
                break
655
            for g in logs:
656
                if ((g.source, g.resource) not in resource_set
657
                    or (g.target, g.resource) not in resource_set):
658
                    continue
659

    
660
                o = {
661
                    'serial':                   g.serial,
662
                    'source':                   g.source,
663
                    'target':                   g.target,
664
                    'resource':                 g.resource,
665
                    'name':                     g.name,
666
                    'quantity':                 g.delta_quantity,
667
                    'source_allocated':         g.source_allocated(),
668
                    'source_allocated_through': g.source_allocated_through(),
669
                    'source_inbound':           g.source_inbound(),
670
                    'source_inbound_through':   g.source_inbound_through(),
671
                    'source_outbound':          g.source_outbound(),
672
                    'source_outbound_through':  g.source_outbound_through(),
673
                    'target_allocated':         g.target_allocated(),
674
                    'target_allocated_through': g.target_allocated_through(),
675
                    'target_inbound':           g.target_inbound(),
676
                    'target_inbound_through':   g.target_inbound_through(),
677
                    'target_outbound':          g.target_outbound(),
678
                    'target_outbound_through':  g.target_outbound_through(),
679
                    'issue_time':               g.issue_time,
680
                    'log_time':                 g.log_time,
681
                    'reason':                   g.reason,
682
                }
683

    
684
                append(o)
685

    
686
            after = g.issue_time
687
            if after >= before:
688
                break
689

    
690
        return timeline
691

    
692

    
693
def _add(x, y, invert=False):
694
    return x + y if not invert else x - y
695

    
696

    
697
def _isneg(x):
698
    return x < 0
699

    
700

    
701
API_Callpoint = QuotaholderDjangoDBCallpoint