Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / quotaholder / callpoint.py @ 588d107d

History | View | Annotate | Download (31.3 kB)

1
# Copyright 2012, 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from astakos.quotaholder.exception import (
35
    QuotaholderError,
36
    CorruptedError, InvalidDataError,
37
    NoQuantityError, NoCapacityError,
38
    ExportLimitError, ImportLimitError,
39
    DuplicateError)
40

    
41
from astakos.quotaholder.utils.newname import newname
42
from astakos.quotaholder.api import QH_PRACTICALLY_INFINITE
43

    
44
from django.db.models import Q, Count
45
from django.db.models import Q
46
from .models import (Policy, Holding,
47
                     Commission, Provision, ProvisionLog, CallSerial,
48
                     now,
49
                     db_get_holding, db_get_policy,
50
                     db_get_commission, db_filter_provision, db_get_callserial)
51

    
52

    
53
class QuotaholderDjangoDBCallpoint(object):
54

    
55
    def get_limits(self, context=None, get_limits=()):
56
        limits = []
57
        append = limits.append
58

    
59
        for policy in get_limits:
60
            try:
61
                p = Policy.objects.get(policy=policy)
62
            except Policy.DoesNotExist:
63
                continue
64

    
65
            append((policy, p.quantity, p.capacity,
66
                    p.import_limit, p.export_limit))
67

    
68
        return limits
69

    
70
    def set_limits(self, context=None, set_limits=()):
71

    
72
        for (policy, quantity, capacity,
73
             import_limit, export_limit) in set_limits:
74

    
75
            try:
76
                policy = db_get_policy(policy=policy, for_update=True)
77
            except Policy.DoesNotExist:
78
                Policy.objects.create(policy=policy,
79
                                      quantity=quantity,
80
                                      capacity=capacity,
81
                                      import_limit=import_limit,
82
                                      export_limit=export_limit)
83
            else:
84
                policy.quantity = quantity
85
                policy.capacity = capacity
86
                policy.export_limit = export_limit
87
                policy.import_limit = import_limit
88
                policy.save()
89

    
90
        return ()
91

    
92
    def get_holding(self, context=None, get_holding=()):
93
        holdings = []
94
        append = holdings.append
95

    
96
        for holder, resource in get_holding:
97
            try:
98
                h = Holding.objects.get(holder=holder, resource=resource)
99
            except Holding.DoesNotExist:
100
                continue
101

    
102
            append((h.holder, h.resource, h.policy.policy,
103
                    h.imported, h.exported,
104
                    h.returned, h.released, h.flags))
105

    
106
        return holdings
107

    
108
    def set_holding(self, context=None, set_holding=()):
109
        rejected = []
110
        append = rejected.append
111

    
112
        for holder, resource, policy, flags in set_holding:
113
            try:
114
                p = Policy.objects.get(policy=policy)
115
            except Policy.DoesNotExist:
116
                append((holder, resource, policy))
117
                continue
118

    
119
            try:
120
                h = db_get_holding(holder=holder, resource=resource,
121
                                   for_update=True)
122
                h.policy = p
123
                h.flags = flags
124
                h.save()
125
            except Holding.DoesNotExist:
126
                h = Holding.objects.create(holder=holder, resource=resource,
127
                                           policy=p, flags=flags)
128

    
129
        if rejected:
130
            raise QuotaholderError(rejected)
131
        return rejected
132

    
133
    def _init_holding(self,
134
                      holder, resource, policy,
135
                      imported, exported, returned, released,
136
                      flags):
137
        try:
138
            h = db_get_holding(holder=holder, resource=resource,
139
                               for_update=True)
140
        except Holding.DoesNotExist:
141
            h = Holding(holder=holder, resource=resource)
142

    
143
        h.policy = policy
144
        h.flags = flags
145
        h.imported = imported
146
        h.importing = imported
147
        h.exported = exported
148
        h.exporting = exported
149
        h.returned = returned
150
        h.returning = returned
151
        h.released = released
152
        h.releasing = released
153
        h.save()
154

    
155
    def init_holding(self, context=None, init_holding=()):
156
        rejected = []
157
        append = rejected.append
158

    
159
        for idx, sfh in enumerate(init_holding):
160
            (holder, resource, policy,
161
             imported, exported, returned, released,
162
             flags) = sfh
163

    
164
            try:
165
                p = Policy.objects.get(policy=policy)
166
            except Policy.DoesNotExist:
167
                append(idx)
168
                continue
169

    
170
            self._init_holding(holder, resource, p,
171
                               imported, exported,
172
                               returned, released,
173
                               flags)
174
        if rejected:
175
            raise QuotaholderError(rejected)
176
        return rejected
177

    
178
    def reset_holding(self, context=None, reset_holding=()):
179
        rejected = []
180
        append = rejected.append
181

    
182
        for idx, tpl in enumerate(reset_holding):
183
            (holder, resource,
184
             imported, exported, returned, released) = tpl
185

    
186
            try:
187
                h = db_get_holding(holder=holder, resource=resource,
188
                                   for_update=True)
189
                h.imported = imported
190
                h.importing = imported
191
                h.exported = exported
192
                h.exporting = exported
193
                h.returned = returned
194
                h.returning = returned
195
                h.released = released
196
                h.releasing = released
197
                h.save()
198
            except Holding.DoesNotExist:
199
                append(idx)
200
                continue
201

    
202
        if rejected:
203
            raise QuotaholderError(rejected)
204
        return rejected
205

    
206
    def _check_pending(self, holder, resource):
207
        cs = Commission.objects.filter(holder=holder)
208
        cs = [c for c in cs if c.provisions.filter(resource=resource)]
209
        as_target = [c.serial for c in cs]
210

    
211
        ps = Provision.objects.filter(holder=holder, resource=resource)
212
        as_source = [p.serial.serial for p in ps]
213

    
214
        return as_target + as_source
215

    
216
    def _actual_quantity(self, holding):
217
        hp = holding.policy
218
        return hp.quantity + (holding.imported + holding.returned -
219
                              holding.exported - holding.released)
220

    
221
    def release_holding(self, context=None, release_holding=()):
222
        rejected = []
223
        append = rejected.append
224

    
225
        for idx, (holder, resource) in enumerate(release_holding):
226
            try:
227
                h = db_get_holding(holder=holder, resource=resource,
228
                                   for_update=True)
229
            except Holding.DoesNotExist:
230
                append(idx)
231
                continue
232

    
233
            if self._check_pending(holder, resource):
234
                append(idx)
235
                continue
236

    
237
            q = self._actual_quantity(h)
238
            if q > 0:
239
                append(idx)
240
                continue
241

    
242
            h.delete()
243

    
244
        if rejected:
245
            raise QuotaholderError(rejected)
246
        return rejected
247

    
248
    def list_resources(self, context=None, holder=None):
249
        holdings = Holding.objects.filter(holder=holder)
250
        resources = [h.resource for h in holdings]
251
        return resources
252

    
253
    def list_holdings(self, context=None, list_holdings=()):
254
        rejected = []
255
        reject = rejected.append
256
        holdings_list = []
257
        append = holdings_list.append
258

    
259
        for holder in list_holdings:
260
            holdings = list(Holding.objects.filter(holder=holder))
261
            if not holdings:
262
                reject(holder)
263
                continue
264

    
265
            append([(holder, h.resource,
266
                     h.imported, h.exported, h.returned, h.released)
267
                    for h in holdings])
268

    
269
        return holdings_list, rejected
270

    
271
    def get_quota(self, context=None, get_quota=()):
272
        quotas = []
273
        append = quotas.append
274

    
275
        holders = set(holder for holder, r in get_quota)
276
        hs = Holding.objects.select_related().filter(holder__in=holders)
277
        holdings = {}
278
        for h in hs:
279
            holdings[(h.holder, h.resource)] = h
280

    
281
        for holder, resource in get_quota:
282
            try:
283
                h = holdings[(holder, resource)]
284
            except:
285
                continue
286

    
287
            p = h.policy
288

    
289
            append((h.holder, h.resource, p.quantity, p.capacity,
290
                    p.import_limit, p.export_limit,
291
                    h.imported, h.exported,
292
                    h.returned, h.released,
293
                    h.flags))
294

    
295
        return quotas
296

    
297
    def set_quota(self, context=None, set_quota=()):
298
        rejected = []
299
        append = rejected.append
300

    
301
        q_holdings = Q()
302
        holders = []
303
        for (holder, resource, _, _, _, _, _) in set_quota:
304
            holders.append(holder)
305

    
306
        hs = Holding.objects.filter(holder__in=holders).select_for_update()
307
        holdings = {}
308
        for h in hs:
309
            holdings[(h.holder, h.resource)] = h
310

    
311
        old_policies = []
312

    
313
        for (holder, resource,
314
             quantity, capacity,
315
             import_limit, export_limit, flags) in set_quota:
316

    
317
            policy = newname('policy_')
318
            newp = Policy(policy=policy,
319
                          quantity=quantity,
320
                          capacity=capacity,
321
                          import_limit=import_limit,
322
                          export_limit=export_limit)
323

    
324
            try:
325
                h = holdings[(holder, resource)]
326
                old_policies.append(h.policy_id)
327
                h.policy = newp
328
                h.flags = flags
329
            except KeyError:
330
                h = Holding(holder=holder, resource=resource,
331
                            policy=newp, flags=flags)
332

    
333
            # the order is intentionally reversed so that it
334
            # would break if we are not within a transaction.
335
            # Has helped before.
336
            h.save()
337
            newp.save()
338
            holdings[(holder, resource)] = h
339

    
340
        objs = Policy.objects.annotate(refs=Count('holding'))
341
        objs.filter(policy__in=old_policies, refs=0).delete()
342

    
343
        if rejected:
344
            raise QuotaholderError(rejected)
345
        return rejected
346

    
347
    def add_quota(self,
348
                  context=None, clientkey=None, serial=None,
349
                  sub_quota=(), add_quota=()):
350
        rejected = []
351
        append = rejected.append
352

    
353
        if serial is not None:
354
            if clientkey is None:
355
                all_pairs = [(q[0], q[1]) for q in sub_quota + add_quota]
356
                raise QuotaholderError(all_pairs)
357
            try:
358
                cs = CallSerial.objects.get(serial=serial, clientkey=clientkey)
359
                all_pairs = [(q[0], q[1]) for q in sub_quota + add_quota]
360
                raise QuotaholderError(all_pairs)
361
            except CallSerial.DoesNotExist:
362
                pass
363

    
364
        sources = sub_quota + add_quota
365
        q_holdings = Q()
366
        holders = []
367
        for (holder, resource, _, _, _, _) in sources:
368
            holders.append(holder)
369

    
370
        hs = Holding.objects.filter(holder__in=holders).select_for_update()
371
        holdings = {}
372
        for h in hs:
373
            holdings[(h.holder, h.resource)] = h
374

    
375
        pids = [h.policy_id for h in hs]
376
        policies = Policy.objects.in_bulk(pids)
377

    
378
        old_policies = []
379

    
380
        for removing, source in [(True, sub_quota), (False, add_quota)]:
381
            for (holder, resource,
382
                 quantity, capacity,
383
                 import_limit, export_limit) in source:
384

    
385
                try:
386
                    h = holdings[(holder, resource)]
387
                    old_policies.append(h.policy_id)
388
                    try:
389
                        p = policies[h.policy_id]
390
                    except KeyError:
391
                        raise AssertionError("no policy %s" % h.policy_id)
392
                except KeyError:
393
                    if removing:
394
                        append((holder, resource))
395
                        continue
396

    
397
                    h = Holding(holder=holder, resource=resource, flags=0)
398
                    p = None
399

    
400
                policy = newname('policy_')
401
                newp = Policy(policy=policy)
402

    
403
                newp.quantity = _add(p.quantity if p else 0, quantity,
404
                                     invert=removing)
405
                newp.capacity = _add(p.capacity if p else 0, capacity,
406
                                     invert=removing)
407
                newp.import_limit = _add(p.import_limit if p else 0,
408
                                         import_limit, invert=removing)
409
                newp.export_limit = _add(p.export_limit if p else 0,
410
                                         export_limit, invert=removing)
411

    
412
                new_values = [newp.capacity,
413
                              newp.import_limit, newp.export_limit]
414
                if any(map(_isneg, new_values)):
415
                    append((holder, resource))
416
                    continue
417

    
418
                h.policy = newp
419

    
420
                # the order is intentionally reversed so that it
421
                # would break if we are not within a transaction.
422
                # Has helped before.
423
                h.save()
424
                newp.save()
425
                policies[policy] = newp
426
                holdings[(holder, resource)] = h
427

    
428
        objs = Policy.objects.annotate(refs=Count('holding'))
429
        objs.filter(policy__in=old_policies, refs=0).delete()
430

    
431
        if rejected:
432
            raise QuotaholderError(rejected)
433

    
434
        if serial is not None and clientkey is not None:
435
            CallSerial.objects.create(serial=serial, clientkey=clientkey)
436
        return rejected
437

    
438
    def ack_serials(self, context=None, clientkey=None, serials=()):
439
        if clientkey is None:
440
            return
441

    
442
        for serial in serials:
443
            try:
444
                c = db_get_callserial(clientkey=clientkey,
445
                                      serial=serial,
446
                                      for_update=True)
447
                c.delete()
448
            except CallSerial.DoesNotExist:
449
                pass
450
        return
451

    
452
    def query_serials(self, context=None, clientkey=None, serials=()):
453
        result = []
454
        append = result.append
455

    
456
        if clientkey is None:
457
            return result
458

    
459
        if not serials:
460
            cs = CallSerial.objects.filter(clientkey=clientkey)
461
            return [c.serial for c in cs]
462

    
463
        for serial in serials:
464
            try:
465
                db_get_callserial(clientkey=clientkey, serial=serial)
466
                append(serial)
467
            except CallSerial.DoesNotExist:
468
                pass
469

    
470
        return result
471

    
472
    def issue_commission(self,
473
                         context=None,
474
                         clientkey=None,
475
                         target=None,
476
                         name=None,
477
                         provisions=()):
478

    
479
        create = Commission.objects.create
480
        commission = create(holder=target, clientkey=clientkey, name=name)
481
        serial = commission.serial
482

    
483
        checked = []
484
        for holder, resource, quantity in provisions:
485

    
486
            if holder == target:
487
                m = "Cannot issue commission from an holder to itself (%s)" % (
488
                    holder,)
489
                raise InvalidDataError(m)
490

    
491
            ent_res = holder, resource
492
            if ent_res in checked:
493
                m = "Duplicate provision for %s.%s" % ent_res
494
                raise DuplicateError(m)
495
            checked.append(ent_res)
496

    
497
            release = 0
498
            if quantity < 0:
499
                release = 1
500

    
501
            # Source limits checks
502
            try:
503
                h = db_get_holding(holder=holder, resource=resource,
504
                                   for_update=True)
505
            except Holding.DoesNotExist:
506
                m = ("There is no quantity "
507
                     "to allocate from in %s.%s" % (holder, resource))
508
                raise NoQuantityError(m,
509
                                      source=holder, target=target,
510
                                      resource=resource, requested=quantity,
511
                                      current=0, limit=0)
512

    
513
            hp = h.policy
514

    
515
            if not release:
516
                current = h.exporting
517
                limit = hp.export_limit
518
                if current + quantity > limit:
519
                    m = ("Export limit reached for %s.%s" % (holder, resource))
520
                    raise ExportLimitError(m,
521
                                           source=holder,
522
                                           target=target,
523
                                           resource=resource,
524
                                           requested=quantity,
525
                                           current=current,
526
                                           limit=limit)
527

    
528
                limit = hp.quantity + h.imported - h.releasing
529
                unavailable = h.exporting - h.returned
530
                available = limit - unavailable
531

    
532
                if quantity > available:
533
                    m = ("There is not enough quantity "
534
                         "to allocate from in %s.%s" % (holder, resource))
535
                    raise NoQuantityError(m,
536
                                          source=holder,
537
                                          target=target,
538
                                          resource=resource,
539
                                          requested=quantity,
540
                                          current=unavailable,
541
                                          limit=limit)
542
            else:
543
                current = (+ h.importing + h.returning
544
                           - h.exported - h.returned)
545
                limit = hp.capacity
546
                if current - quantity > limit:
547
                    m = ("There is not enough capacity "
548
                         "to release to in %s.%s" % (holder, resource))
549
                    raise NoQuantityError(m,
550
                                          source=holder,
551
                                          target=target,
552
                                          resource=resource,
553
                                          requested=quantity,
554
                                          current=current,
555
                                          limit=limit)
556

    
557
            # Target limits checks
558
            try:
559
                th = db_get_holding(holder=target, resource=resource,
560
                                    for_update=True)
561
            except Holding.DoesNotExist:
562
                m = ("There is no capacity "
563
                     "to allocate into in %s.%s" % (target, resource))
564
                raise NoCapacityError(m,
565
                                      source=holder,
566
                                      target=target,
567
                                      resource=resource,
568
                                      requested=quantity,
569
                                      current=0,
570
                                      limit=0)
571

    
572
            tp = th.policy
573

    
574
            if not release:
575
                limit = tp.import_limit
576
                current = th.importing
577
                if current + quantity > limit:
578
                    m = ("Import limit reached for %s.%s" % (target, resource))
579
                    raise ImportLimitError(m,
580
                                           source=holder,
581
                                           target=target,
582
                                           resource=resource,
583
                                           requested=quantity,
584
                                           current=current,
585
                                           limit=limit)
586

    
587
                limit = tp.quantity + tp.capacity
588
                current = (+ th.importing + th.returning + tp.quantity
589
                           - th.exported - th.released)
590

    
591
                if current + quantity > limit:
592
                    m = ("There is not enough capacity "
593
                         "to allocate into in %s.%s" % (target, resource))
594
                    raise NoCapacityError(m,
595
                                          source=holder,
596
                                          target=target,
597
                                          resource=resource,
598
                                          requested=quantity,
599
                                          current=current,
600
                                          limit=limit)
601
            else:
602
                limit = tp.quantity + th.imported - th.releasing
603
                unavailable = th.exporting - th.returned
604
                available = limit - unavailable
605

    
606
                if available + quantity < 0:
607
                    m = ("There is not enough quantity "
608
                         "to release from in %s.%s" % (target, resource))
609
                    raise NoCapacityError(m,
610
                                          source=holder,
611
                                          target=target,
612
                                          resource=resource,
613
                                          requested=quantity,
614
                                          current=unavailable,
615
                                          limit=limit)
616

    
617
            Provision.objects.create(serial=commission,
618
                                     holder=holder,
619
                                     resource=resource,
620
                                     quantity=quantity)
621
            if release:
622
                h.returning -= quantity
623
                th.releasing -= quantity
624
            else:
625
                h.exporting += quantity
626
                th.importing += quantity
627

    
628
            h.save()
629
            th.save()
630

    
631
        return serial
632

    
633
    def _log_provision(self,
634
                       commission, s_holding, t_holding,
635
                       provision, log_time, reason):
636

    
637
        s_holder = s_holding.holder
638
        s_policy = s_holding.policy
639
        t_holder = t_holding.holder
640
        t_policy = t_holding.policy
641

    
642
        kwargs = {
643
            'serial':              commission.serial,
644
            'name':                commission.name,
645
            'source':              s_holder,
646
            'target':              t_holder,
647
            'resource':            provision.resource,
648
            'source_quantity':     s_policy.quantity,
649
            'source_capacity':     s_policy.capacity,
650
            'source_import_limit': s_policy.import_limit,
651
            'source_export_limit': s_policy.export_limit,
652
            'source_imported':     s_holding.imported,
653
            'source_exported':     s_holding.exported,
654
            'source_returned':     s_holding.returned,
655
            'source_released':     s_holding.released,
656
            'target_quantity':     t_policy.quantity,
657
            'target_capacity':     t_policy.capacity,
658
            'target_import_limit': t_policy.import_limit,
659
            'target_export_limit': t_policy.export_limit,
660
            'target_imported':     t_holding.imported,
661
            'target_exported':     t_holding.exported,
662
            'target_returned':     t_holding.returned,
663
            'target_released':     t_holding.released,
664
            'delta_quantity':      provision.quantity,
665
            'issue_time':          commission.issue_time,
666
            'log_time':            log_time,
667
            'reason':              reason,
668
        }
669

    
670
        ProvisionLog.objects.create(**kwargs)
671

    
672
    def accept_commission(self,
673
                          context=None, clientkey=None,
674
                          serials=(), reason=''):
675
        log_time = now()
676

    
677
        for serial in serials:
678
            try:
679
                c = db_get_commission(clientkey=clientkey, serial=serial,
680
                                      for_update=True)
681
            except Commission.DoesNotExist:
682
                return
683

    
684
            t = c.holder
685

    
686
            provisions = db_filter_provision(serial=serial, for_update=True)
687
            for pv in provisions:
688
                try:
689
                    h = db_get_holding(holder=pv.holder,
690
                                       resource=pv.resource, for_update=True)
691
                    th = db_get_holding(holder=t, resource=pv.resource,
692
                                        for_update=True)
693
                except Holding.DoesNotExist:
694
                    m = "Corrupted provision"
695
                    raise CorruptedError(m)
696

    
697
                quantity = pv.quantity
698
                release = 0
699
                if quantity < 0:
700
                    release = 1
701

    
702
                if release:
703
                    h.returned -= quantity
704
                    th.released -= quantity
705
                else:
706
                    h.exported += quantity
707
                    th.imported += quantity
708

    
709
                reason = 'ACCEPT:' + reason[-121:]
710
                self._log_provision(c, h, th, pv, log_time, reason)
711
                h.save()
712
                th.save()
713
                pv.delete()
714
            c.delete()
715

    
716
        return
717

    
718
    def reject_commission(self,
719
                          context=None, clientkey=None,
720
                          serials=(), reason=''):
721
        log_time = now()
722

    
723
        for serial in serials:
724
            try:
725
                c = db_get_commission(clientkey=clientkey, serial=serial,
726
                                      for_update=True)
727
            except Commission.DoesNotExist:
728
                return
729

    
730
            t = c.holder
731

    
732
            provisions = db_filter_provision(serial=serial, for_update=True)
733
            for pv in provisions:
734
                try:
735
                    h = db_get_holding(holder=pv.holder,
736
                                       resource=pv.resource, for_update=True)
737
                    th = db_get_holding(holder=t, resource=pv.resource,
738
                                        for_update=True)
739
                except Holding.DoesNotExist:
740
                    m = "Corrupted provision"
741
                    raise CorruptedError(m)
742

    
743
                quantity = pv.quantity
744
                release = 0
745
                if quantity < 0:
746
                    release = 1
747

    
748
                if release:
749
                    h.returning += quantity
750
                    th.releasing += quantity
751
                else:
752
                    h.exporting -= quantity
753
                    th.importing -= quantity
754

    
755
                reason = 'REJECT:' + reason[-121:]
756
                self._log_provision(c, h, th, pv, log_time, reason)
757
                h.save()
758
                th.save()
759
                pv.delete()
760
            c.delete()
761

    
762
        return
763

    
764
    def get_pending_commissions(self, context=None, clientkey=None):
765
        pending = Commission.objects.filter(clientkey=clientkey)
766
        pending_list = pending.values_list('serial', flat=True)
767
        return pending_list
768

    
769
    def resolve_pending_commissions(self,
770
                                    context=None, clientkey=None,
771
                                    max_serial=None, accept_set=()):
772
        accept_set = set(accept_set)
773
        pending = self.get_pending_commissions(context=context,
774
                                               clientkey=clientkey)
775
        pending = sorted(pending)
776

    
777
        accept = self.accept_commission
778
        reject = self.reject_commission
779

    
780
        for serial in pending:
781
            if serial > max_serial:
782
                break
783

    
784
            if serial in accept_set:
785
                accept(context=context, clientkey=clientkey, serials=[serial])
786
            else:
787
                reject(context=context, clientkey=clientkey, serials=[serial])
788

    
789
        return
790

    
791
    def get_timeline(self, context=None, after="", before="Z", get_timeline=()):
792
        holder_set = set()
793
        e_add = holder_set.add
794
        resource_set = set()
795
        r_add = resource_set.add
796

    
797
        for holder, resource in get_timeline:
798
            if holder not in holder_set:
799
                e_add(holder)
800

    
801
            r_add((holder, resource))
802

    
803
        chunk_size = 65536
804
        nr = 0
805
        timeline = []
806
        append = timeline.append
807
        filterlogs = ProvisionLog.objects.filter
808
        if holder_set:
809
            q_holder = Q(source__in=holder_set) | Q(target__in=holder_set)
810
        else:
811
            q_holder = Q()
812

    
813
        while 1:
814
            logs = filterlogs(q_holder,
815
                              issue_time__gt=after,
816
                              issue_time__lte=before,
817
                              reason__startswith='ACCEPT:')
818

    
819
            logs = logs.order_by('issue_time')
820
            #logs = logs.values()
821
            logs = logs[:chunk_size]
822
            nr += len(logs)
823
            if not logs:
824
                break
825
            for g in logs:
826
                if ((g.source, g.resource) not in resource_set
827
                    or (g.target, g.resource) not in resource_set):
828
                    continue
829

    
830
                o = {
831
                    'serial':                   g.serial,
832
                    'source':                   g.source,
833
                    'target':                   g.target,
834
                    'resource':                 g.resource,
835
                    'name':                     g.name,
836
                    'quantity':                 g.delta_quantity,
837
                    'source_allocated':         g.source_allocated(),
838
                    'source_allocated_through': g.source_allocated_through(),
839
                    'source_inbound':           g.source_inbound(),
840
                    'source_inbound_through':   g.source_inbound_through(),
841
                    'source_outbound':          g.source_outbound(),
842
                    'source_outbound_through':  g.source_outbound_through(),
843
                    'target_allocated':         g.target_allocated(),
844
                    'target_allocated_through': g.target_allocated_through(),
845
                    'target_inbound':           g.target_inbound(),
846
                    'target_inbound_through':   g.target_inbound_through(),
847
                    'target_outbound':          g.target_outbound(),
848
                    'target_outbound_through':  g.target_outbound_through(),
849
                    'issue_time':               g.issue_time,
850
                    'log_time':                 g.log_time,
851
                    'reason':                   g.reason,
852
                }
853

    
854
                append(o)
855

    
856
            after = g.issue_time
857
            if after >= before:
858
                break
859

    
860
        return timeline
861

    
862

    
863
def _add(x, y, invert=False):
864
    return x + y if not invert else x - y
865

    
866

    
867
def _isneg(x):
868
    return x < 0
869

    
870

    
871
API_Callpoint = QuotaholderDjangoDBCallpoint