Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / quotaholder / callpoint.py @ b0727daf

History | View | Annotate | Download (37.5 kB)

1
# Copyright 2012, 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from astakos.quotaholder.exception import (
35
    QuotaholderError,
36
    CorruptedError, InvalidDataError,
37
    InvalidKeyError, NoEntityError,
38
    NoQuantityError, NoCapacityError,
39
    ExportLimitError, ImportLimitError,
40
    DuplicateError)
41

    
42
from astakos.quotaholder.utils.newname import newname
43
from astakos.quotaholder.api import QH_PRACTICALLY_INFINITE
44

    
45
from django.db.models import Q, Count
46
from .models import (Entity, Policy, Holding,
47
                     Commission, Provision, ProvisionLog, CallSerial,
48
                     now,
49
                     db_get_entity, db_get_holding, db_get_policy,
50
                     db_get_commission, db_filter_provision, db_get_callserial)
51

    
52

    
53
class QuotaholderDjangoDBCallpoint(object):
54

    
55
    def create_entity(self, context=None, create_entity=()):
56
        rejected = []
57
        append = rejected.append
58

    
59
        for idx, (entity, owner, key, ownerkey) in enumerate(create_entity):
60
            try:
61
                owner = Entity.objects.get(entity=owner, key=ownerkey)
62
            except Entity.DoesNotExist:
63
                append(idx)
64
                continue
65

    
66
            try:
67
                e = Entity.objects.get(entity=entity)
68
                append(idx)
69
            except Entity.DoesNotExist:
70
                e = Entity.objects.create(entity=entity,
71
                                          owner=owner,
72
                                          key=key)
73

    
74
        if rejected:
75
            raise QuotaholderError(rejected)
76
        return rejected
77

    
78
    def set_entity_key(self, context=None, set_entity_key=()):
79
        rejected = []
80
        append = rejected.append
81

    
82
        for entity, key, newkey in set_entity_key:
83
            try:
84
                e = db_get_entity(entity=entity, key=key, for_update=True)
85
            except Entity.DoesNotExist:
86
                append(entity)
87
                continue
88

    
89
            e.key = newkey
90
            e.save()
91

    
92
        if rejected:
93
            raise QuotaholderError(rejected)
94
        return rejected
95

    
96
    def list_entities(self, context=None, entity=None, key=None):
97
        try:
98
            e = Entity.objects.get(entity=entity, key=key)
99
        except Entity.DoesNotExist:
100
            m = "Entity '%s' does not exist" % (entity,)
101
            raise NoEntityError(m)
102

    
103
        children = e.entities.all()
104
        entities = [e.entity for e in children]
105
        return entities
106

    
107
    def get_entity(self, context=None, get_entity=()):
108
        entities = []
109
        append = entities.append
110

    
111
        names = [entity for entity, key in get_entity]
112
        es = Entity.objects.select_related(depth=1).filter(entity__in=names)
113
        data = {}
114
        for e in es:
115
            data[e.entity] = e
116

    
117
        for entity, key in get_entity:
118
            e = data.get(entity, None)
119
            if e is None or e.key != key:
120
                continue
121
            append((entity, e.owner.entity))
122

    
123
        return entities
124

    
125
    def get_limits(self, context=None, get_limits=()):
126
        limits = []
127
        append = limits.append
128

    
129
        for policy in get_limits:
130
            try:
131
                p = Policy.objects.get(policy=policy)
132
            except Policy.DoesNotExist:
133
                continue
134

    
135
            append((policy, p.quantity, p.capacity,
136
                    p.import_limit, p.export_limit))
137

    
138
        return limits
139

    
140
    def set_limits(self, context=None, set_limits=()):
141

    
142
        for (policy, quantity, capacity,
143
             import_limit, export_limit) in set_limits:
144

    
145
            try:
146
                policy = db_get_policy(policy=policy, for_update=True)
147
            except Policy.DoesNotExist:
148
                Policy.objects.create(policy=policy,
149
                                      quantity=quantity,
150
                                      capacity=capacity,
151
                                      import_limit=import_limit,
152
                                      export_limit=export_limit)
153
            else:
154
                policy.quantity = quantity
155
                policy.capacity = capacity
156
                policy.export_limit = export_limit
157
                policy.import_limit = import_limit
158
                policy.save()
159

    
160
        return ()
161

    
162
    def get_holding(self, context=None, get_holding=()):
163
        holdings = []
164
        append = holdings.append
165

    
166
        for entity, resource, key in get_holding:
167
            try:
168
                h = Holding.objects.get(entity=entity, resource=resource)
169
            except Holding.DoesNotExist:
170
                continue
171

    
172
            if h.entity.key != key:
173
                continue
174

    
175
            append((h.entity.entity, h.resource, h.policy.policy,
176
                    h.imported, h.exported,
177
                    h.returned, h.released, h.flags))
178

    
179
        return holdings
180

    
181
    def set_holding(self, context=None, set_holding=()):
182
        rejected = []
183
        append = rejected.append
184

    
185
        for entity, resource, key, policy, flags in set_holding:
186
            try:
187
                e = Entity.objects.get(entity=entity, key=key)
188
            except Entity.DoesNotExist:
189
                append((entity, resource, policy))
190
                continue
191

    
192
            if e.key != key:
193
                append((entity, resource, policy))
194
                continue
195

    
196
            try:
197
                p = Policy.objects.get(policy=policy)
198
            except Policy.DoesNotExist:
199
                append((entity, resource, policy))
200
                continue
201

    
202
            try:
203
                h = db_get_holding(entity=entity, resource=resource,
204
                                   for_update=True)
205
                h.policy = p
206
                h.flags = flags
207
                h.save()
208
            except Holding.DoesNotExist:
209
                h = Holding.objects.create(entity=e, resource=resource,
210
                                           policy=p, flags=flags)
211

    
212
        if rejected:
213
            raise QuotaholderError(rejected)
214
        return rejected
215

    
216
    def _init_holding(self,
217
                      entity, resource, policy,
218
                      imported, exported, returned, released,
219
                      flags):
220
        try:
221
            h = db_get_holding(entity=entity, resource=resource,
222
                               for_update=True)
223
        except Holding.DoesNotExist:
224
            h = Holding(entity=entity, resource=resource)
225

    
226
        h.policy = policy
227
        h.flags = flags
228
        h.imported = imported
229
        h.importing = imported
230
        h.exported = exported
231
        h.exporting = exported
232
        h.returned = returned
233
        h.returning = returned
234
        h.released = released
235
        h.releasing = released
236
        h.save()
237

    
238
    def init_holding(self, context=None, init_holding=()):
239
        rejected = []
240
        append = rejected.append
241

    
242
        for idx, sfh in enumerate(init_holding):
243
            (entity, resource, key, policy,
244
             imported, exported, returned, released,
245
             flags) = sfh
246
            try:
247
                e = Entity.objects.get(entity=entity, key=key)
248
            except Entity.DoesNotExist:
249
                append(idx)
250
                continue
251

    
252
            if e.key != key:
253
                append(idx)
254
                continue
255

    
256
            try:
257
                p = Policy.objects.get(policy=policy)
258
            except Policy.DoesNotExist:
259
                append(idx)
260
                continue
261

    
262
            self._init_holding(e, resource, p,
263
                               imported, exported,
264
                               returned, released,
265
                               flags)
266
        if rejected:
267
            raise QuotaholderError(rejected)
268
        return rejected
269

    
270
    def reset_holding(self, context=None, reset_holding=()):
271
        rejected = []
272
        append = rejected.append
273

    
274
        for idx, tpl in enumerate(reset_holding):
275
            (entity, resource, key,
276
             imported, exported, returned, released) = tpl
277
            try:
278
                e = Entity.objects.get(entity=entity, key=key)
279
            except Entity.DoesNotExist:
280
                append(idx)
281
                continue
282

    
283
            try:
284
                h = db_get_holding(entity=entity, resource=resource,
285
                                   for_update=True)
286
                h.imported = imported
287
                h.importing = imported
288
                h.exported = exported
289
                h.exporting = exported
290
                h.returned = returned
291
                h.returning = returned
292
                h.released = released
293
                h.releasing = released
294
                h.save()
295
            except Holding.DoesNotExist:
296
                append(idx)
297
                continue
298

    
299
        if rejected:
300
            raise QuotaholderError(rejected)
301
        return rejected
302

    
303
    def _check_pending(self, entity, resource):
304
        cs = Commission.objects.filter(entity=entity)
305
        cs = [c for c in cs if c.provisions.filter(resource=resource)]
306
        as_target = [c.serial for c in cs]
307

    
308
        ps = Provision.objects.filter(entity=entity, resource=resource)
309
        as_source = [p.serial.serial for p in ps]
310

    
311
        return as_target + as_source
312

    
313
    def _actual_quantity(self, holding):
314
        hp = holding.policy
315
        return hp.quantity + (holding.imported + holding.returned -
316
                              holding.exported - holding.released)
317

    
318
    def _new_policy_name(self):
319
        return newname('policy_')
320

    
321
    def _increase_resource(self, entity, resource, amount):
322
        try:
323
            h = db_get_holding(entity=entity, resource=resource,
324
                               for_update=True)
325
        except Holding.DoesNotExist:
326
            h = Holding(entity=entity, resource=resource)
327
            p = Policy.objects.create(policy=self._new_policy_name(),
328
                                      quantity=0,
329
                                      capacity=QH_PRACTICALLY_INFINITE,
330
                                      import_limit=QH_PRACTICALLY_INFINITE,
331
                                      export_limit=QH_PRACTICALLY_INFINITE)
332
            h.policy = p
333
        h.imported += amount
334
        h.save()
335

    
336
    def release_holding(self, context=None, release_holding=()):
337
        rejected = []
338
        append = rejected.append
339

    
340
        for idx, (entity, resource, key) in enumerate(release_holding):
341
            try:
342
                h = db_get_holding(entity=entity, resource=resource,
343
                                   for_update=True)
344
            except Holding.DoesNotExist:
345
                append(idx)
346
                continue
347

    
348
            if h.entity.key != key:
349
                append(idx)
350
                continue
351

    
352
            if self._check_pending(entity, resource):
353
                append(idx)
354
                continue
355

    
356
            q = self._actual_quantity(h)
357
            if q > 0:
358
                owner = h.entity.owner
359
                self._increase_resource(owner, resource, q)
360

    
361
            h.delete()
362

    
363
        if rejected:
364
            raise QuotaholderError(rejected)
365
        return rejected
366

    
367
    def list_resources(self, context=None, entity=None, key=None):
368
        try:
369
            e = Entity.objects.get(entity=entity)
370
        except Entity.DoesNotExist:
371
            m = "No such entity '%s'" % (entity,)
372
            raise NoEntityError(m)
373

    
374
        if e.key != key:
375
            m = "Invalid key for entity '%s'" % (entity,)
376
            raise InvalidKeyError(m)
377

    
378
        holdings = e.holding_set.filter(entity=entity)
379
        resources = [h.resource for h in holdings]
380
        return resources
381

    
382
    def list_holdings(self, context=None, list_holdings=()):
383
        rejected = []
384
        reject = rejected.append
385
        holdings_list = []
386
        append = holdings_list.append
387

    
388
        for entity, key in list_holdings:
389
            try:
390
                e = Entity.objects.get(entity=entity)
391
                if e.key != key:
392
                    raise Entity.DoesNotExist("wrong key")
393
            except Entity.DoesNotExist:
394
                reject(entity)
395
                continue
396

    
397
            holdings = e.holding_set.filter(entity=entity)
398
            append([[entity, h.resource,
399
                     h.imported, h.exported, h.returned, h.released]
400
                    for h in holdings])
401

    
402
        return holdings_list, rejected
403

    
404
    def get_quota(self, context=None, get_quota=()):
405
        quotas = []
406
        append = quotas.append
407

    
408
        entities = set(e for e, r, k in get_quota)
409
        hs = Holding.objects.select_related().filter(entity__in=entities)
410
        holdings = {}
411
        for h in hs:
412
            holdings[(h.entity_id, h.resource)] = h
413

    
414
        for entity, resource, key in get_quota:
415
            try:
416
                h = holdings[(entity, resource)]
417
            except:
418
                continue
419

    
420
            if h.entity.key != key:
421
                continue
422

    
423
            p = h.policy
424

    
425
            append((h.entity.entity, h.resource, p.quantity, p.capacity,
426
                    p.import_limit, p.export_limit,
427
                    h.imported, h.exported,
428
                    h.returned, h.released,
429
                    h.flags))
430

    
431
        return quotas
432

    
433
    def set_quota(self, context=None, set_quota=()):
434
        rejected = []
435
        append = rejected.append
436

    
437
        q_holdings = Q()
438
        entities = []
439
        for (entity, resource, key, _, _, _, _, _) in set_quota:
440
            entities.append(entity)
441

    
442
        hs = Holding.objects.filter(entity__in=entities).select_for_update()
443
        holdings = {}
444
        for h in hs:
445
            holdings[(h.entity_id, h.resource)] = h
446

    
447
        entities = Entity.objects.in_bulk(entities)
448

    
449
        old_policies = []
450

    
451
        for (entity, resource, key,
452
             quantity, capacity,
453
             import_limit, export_limit, flags) in set_quota:
454

    
455
            e = entities.get(entity, None)
456
            if e is None or e.key != key:
457
                append((entity, resource))
458
                continue
459

    
460
            policy = newname('policy_')
461
            newp = Policy(policy=policy,
462
                          quantity=quantity,
463
                          capacity=capacity,
464
                          import_limit=import_limit,
465
                          export_limit=export_limit)
466

    
467
            try:
468
                h = holdings[(entity, resource)]
469
                old_policies.append(h.policy_id)
470
                h.policy = newp
471
                h.flags = flags
472
            except KeyError:
473
                h = Holding(entity=e, resource=resource,
474
                            policy=newp, flags=flags)
475

    
476
            # the order is intentionally reversed so that it
477
            # would break if we are not within a transaction.
478
            # Has helped before.
479
            h.save()
480
            newp.save()
481
            holdings[(entity, resource)] = h
482

    
483
        objs = Policy.objects.annotate(refs=Count('holding'))
484
        objs.filter(policy__in=old_policies, refs=0).delete()
485

    
486
        if rejected:
487
            raise QuotaholderError(rejected)
488
        return rejected
489

    
490
    def add_quota(self,
491
                  context=None, clientkey=None, serial=None,
492
                  sub_quota=(), add_quota=()):
493
        rejected = []
494
        append = rejected.append
495

    
496
        if serial is not None:
497
            if clientkey is None:
498
                all_pairs = [(q[0], q[1]) for q in sub_quota + add_quota]
499
                raise QuotaholderError(all_pairs)
500
            try:
501
                cs = CallSerial.objects.get(serial=serial, clientkey=clientkey)
502
                all_pairs = [(q[0], q[1]) for q in sub_quota + add_quota]
503
                raise QuotaholderError(all_pairs)
504
            except CallSerial.DoesNotExist:
505
                pass
506

    
507
        sources = sub_quota + add_quota
508
        q_holdings = Q()
509
        entities = []
510
        for (entity, resource, key, _, _, _, _) in sources:
511
            entities.append(entity)
512

    
513
        hs = Holding.objects.filter(entity__in=entities).select_for_update()
514
        holdings = {}
515
        for h in hs:
516
            holdings[(h.entity_id, h.resource)] = h
517

    
518
        entities = Entity.objects.in_bulk(entities)
519

    
520
        pids = [h.policy_id for h in hs]
521
        policies = Policy.objects.in_bulk(pids)
522

    
523
        old_policies = []
524

    
525
        for removing, source in [(True, sub_quota), (False, add_quota)]:
526
            for (entity, resource, key,
527
                 quantity, capacity,
528
                 import_limit, export_limit) in source:
529

    
530
                e = entities.get(entity, None)
531
                if e is None or e.key != key:
532
                    append((entity, resource))
533
                    continue
534

    
535
                try:
536
                    h = holdings[(entity, resource)]
537
                    old_policies.append(h.policy_id)
538
                    try:
539
                        p = policies[h.policy_id]
540
                    except KeyError:
541
                        raise AssertionError("no policy %s" % h.policy_id)
542
                except KeyError:
543
                    if removing:
544
                        append((entity, resource))
545
                        continue
546
                    h = Holding(entity=e, resource=resource, flags=0)
547
                    p = None
548

    
549
                policy = newname('policy_')
550
                newp = Policy(policy=policy)
551

    
552
                newp.quantity = _add(p.quantity if p else 0, quantity,
553
                                     invert=removing)
554
                newp.capacity = _add(p.capacity if p else 0, capacity,
555
                                     invert=removing)
556
                newp.import_limit = _add(p.import_limit if p else 0,
557
                                         import_limit, invert=removing)
558
                newp.export_limit = _add(p.export_limit if p else 0,
559
                                         export_limit, invert=removing)
560

    
561
                new_values = [newp.capacity,
562
                              newp.import_limit, newp.export_limit]
563
                if any(map(_isneg, new_values)):
564
                    append((entity, resource))
565
                    continue
566

    
567
                h.policy = newp
568

    
569
                # the order is intentionally reversed so that it
570
                # would break if we are not within a transaction.
571
                # Has helped before.
572
                h.save()
573
                newp.save()
574
                policies[policy] = newp
575
                holdings[(entity, resource)] = h
576

    
577
        objs = Policy.objects.annotate(refs=Count('holding'))
578
        objs.filter(policy__in=old_policies, refs=0).delete()
579

    
580
        if rejected:
581
            raise QuotaholderError(rejected)
582

    
583
        if serial is not None and clientkey is not None:
584
            CallSerial.objects.create(serial=serial, clientkey=clientkey)
585
        return rejected
586

    
587
    def ack_serials(self, context=None, clientkey=None, serials=()):
588
        if clientkey is None:
589
            return
590

    
591
        for serial in serials:
592
            try:
593
                c = db_get_callserial(clientkey=clientkey,
594
                                      serial=serial,
595
                                      for_update=True)
596
                c.delete()
597
            except CallSerial.DoesNotExist:
598
                pass
599
        return
600

    
601
    def query_serials(self, context=None, clientkey=None, serials=()):
602
        result = []
603
        append = result.append
604

    
605
        if clientkey is None:
606
            return result
607

    
608
        if not serials:
609
            cs = CallSerial.objects.filter(clientkey=clientkey)
610
            return [c.serial for c in cs]
611

    
612
        for serial in serials:
613
            try:
614
                db_get_callserial(clientkey=clientkey, serial=serial)
615
                append(serial)
616
            except CallSerial.DoesNotExist:
617
                pass
618

    
619
        return result
620

    
621
    def issue_commission(self,
622
                         context=None,
623
                         clientkey=None,
624
                         target=None,
625
                         key=None,
626
                         name=None,
627
                         provisions=()):
628

    
629
        try:
630
            t = Entity.objects.get(entity=target)
631
        except Entity.DoesNotExist:
632
            m = "No target entity '%s'" % (target,)
633
            raise NoEntityError(m)
634
        else:
635
            if t.key != key:
636
                m = "Invalid key for target entity '%s'" % (target,)
637
                raise InvalidKeyError(m)
638

    
639
        create = Commission.objects.create
640
        commission = create(entity_id=target, clientkey=clientkey, name=name)
641
        serial = commission.serial
642

    
643
        checked = []
644
        for entity, resource, quantity in provisions:
645

    
646
            if entity == target:
647
                m = "Cannot issue commission from an entity to itself (%s)" % (
648
                    entity,)
649
                raise InvalidDataError(m)
650

    
651
            ent_res = entity, resource
652
            if ent_res in checked:
653
                m = "Duplicate provision for %s.%s" % ent_res
654
                raise DuplicateError(m)
655
            checked.append(ent_res)
656

    
657
            try:
658
                e = Entity.objects.get(entity=entity)
659
            except Entity.DoesNotExist:
660
                m = "No source entity '%s'" % (entity,)
661
                raise NoEntityError(m)
662

    
663
            release = 0
664
            if quantity < 0:
665
                release = 1
666

    
667
            # Source limits checks
668
            try:
669
                h = db_get_holding(entity=entity, resource=resource,
670
                                   for_update=True)
671
            except Holding.DoesNotExist:
672
                m = ("There is no quantity "
673
                     "to allocate from in %s.%s" % (entity, resource))
674
                raise NoQuantityError(m,
675
                                      source=entity, target=target,
676
                                      resource=resource, requested=quantity,
677
                                      current=0, limit=0)
678

    
679
            hp = h.policy
680

    
681
            if not release:
682
                current = h.exporting
683
                limit = hp.export_limit
684
                if current + quantity > limit:
685
                    m = ("Export limit reached for %s.%s" % (entity, resource))
686
                    raise ExportLimitError(m,
687
                                           source=entity,
688
                                           target=target,
689
                                           resource=resource,
690
                                           requested=quantity,
691
                                           current=current,
692
                                           limit=limit)
693

    
694
                limit = hp.quantity + h.imported - h.releasing
695
                unavailable = h.exporting - h.returned
696
                available = limit - unavailable
697

    
698
                if quantity > available:
699
                    m = ("There is not enough quantity "
700
                         "to allocate from in %s.%s" % (entity, resource))
701
                    raise NoQuantityError(m,
702
                                          source=entity,
703
                                          target=target,
704
                                          resource=resource,
705
                                          requested=quantity,
706
                                          current=unavailable,
707
                                          limit=limit)
708
            else:
709
                current = (+ h.importing + h.returning
710
                           - h.exported - h.returned)
711
                limit = hp.capacity
712
                if current - quantity > limit:
713
                    m = ("There is not enough capacity "
714
                         "to release to in %s.%s" % (entity, resource))
715
                    raise NoQuantityError(m,
716
                                          source=entity,
717
                                          target=target,
718
                                          resource=resource,
719
                                          requested=quantity,
720
                                          current=current,
721
                                          limit=limit)
722

    
723
            # Target limits checks
724
            try:
725
                th = db_get_holding(entity=target, resource=resource,
726
                                    for_update=True)
727
            except Holding.DoesNotExist:
728
                m = ("There is no capacity "
729
                     "to allocate into in %s.%s" % (target, resource))
730
                raise NoCapacityError(m,
731
                                      source=entity,
732
                                      target=target,
733
                                      resource=resource,
734
                                      requested=quantity,
735
                                      current=0,
736
                                      limit=0)
737

    
738
            tp = th.policy
739

    
740
            if not release:
741
                limit = tp.import_limit
742
                current = th.importing
743
                if current + quantity > limit:
744
                    m = ("Import limit reached for %s.%s" % (target, resource))
745
                    raise ImportLimitError(m,
746
                                           source=entity,
747
                                           target=target,
748
                                           resource=resource,
749
                                           requested=quantity,
750
                                           current=current,
751
                                           limit=limit)
752

    
753
                limit = tp.quantity + tp.capacity
754
                current = (+ th.importing + th.returning + tp.quantity
755
                           - th.exported - th.released)
756

    
757
                if current + quantity > limit:
758
                    m = ("There is not enough capacity "
759
                         "to allocate into in %s.%s" % (target, resource))
760
                    raise NoCapacityError(m,
761
                                          source=entity,
762
                                          target=target,
763
                                          resource=resource,
764
                                          requested=quantity,
765
                                          current=current,
766
                                          limit=limit)
767
            else:
768
                limit = tp.quantity + th.imported - th.releasing
769
                unavailable = th.exporting - th.returned
770
                available = limit - unavailable
771

    
772
                if available + quantity < 0:
773
                    m = ("There is not enough quantity "
774
                         "to release from in %s.%s" % (target, resource))
775
                    raise NoCapacityError(m,
776
                                          source=entity,
777
                                          target=target,
778
                                          resource=resource,
779
                                          requested=quantity,
780
                                          current=unavailable,
781
                                          limit=limit)
782

    
783
            Provision.objects.create(serial=commission,
784
                                     entity=e,
785
                                     resource=resource,
786
                                     quantity=quantity)
787
            if release:
788
                h.returning -= quantity
789
                th.releasing -= quantity
790
            else:
791
                h.exporting += quantity
792
                th.importing += quantity
793

    
794
            h.save()
795
            th.save()
796

    
797
        return serial
798

    
799
    def _log_provision(self,
800
                       commission, s_holding, t_holding,
801
                       provision, log_time, reason):
802

    
803
        s_entity = s_holding.entity
804
        s_policy = s_holding.policy
805
        t_entity = t_holding.entity
806
        t_policy = t_holding.policy
807

    
808
        kwargs = {
809
            'serial':              commission.serial,
810
            'name':                commission.name,
811
            'source':              s_entity.entity,
812
            'target':              t_entity.entity,
813
            'resource':            provision.resource,
814
            'source_quantity':     s_policy.quantity,
815
            'source_capacity':     s_policy.capacity,
816
            'source_import_limit': s_policy.import_limit,
817
            'source_export_limit': s_policy.export_limit,
818
            'source_imported':     s_holding.imported,
819
            'source_exported':     s_holding.exported,
820
            'source_returned':     s_holding.returned,
821
            'source_released':     s_holding.released,
822
            'target_quantity':     t_policy.quantity,
823
            'target_capacity':     t_policy.capacity,
824
            'target_import_limit': t_policy.import_limit,
825
            'target_export_limit': t_policy.export_limit,
826
            'target_imported':     t_holding.imported,
827
            'target_exported':     t_holding.exported,
828
            'target_returned':     t_holding.returned,
829
            'target_released':     t_holding.released,
830
            'delta_quantity':      provision.quantity,
831
            'issue_time':          commission.issue_time,
832
            'log_time':            log_time,
833
            'reason':              reason,
834
        }
835

    
836
        ProvisionLog.objects.create(**kwargs)
837

    
838
    def accept_commission(self,
839
                          context=None, clientkey=None,
840
                          serials=(), reason=''):
841
        log_time = now()
842

    
843
        for serial in serials:
844
            try:
845
                c = db_get_commission(clientkey=clientkey, serial=serial,
846
                                      for_update=True)
847
            except Commission.DoesNotExist:
848
                return
849

    
850
            t = c.entity
851

    
852
            provisions = db_filter_provision(serial=serial, for_update=True)
853
            for pv in provisions:
854
                try:
855
                    h = db_get_holding(entity=pv.entity.entity,
856
                                       resource=pv.resource, for_update=True)
857
                    th = db_get_holding(entity=t, resource=pv.resource,
858
                                        for_update=True)
859
                except Holding.DoesNotExist:
860
                    m = "Corrupted provision"
861
                    raise CorruptedError(m)
862

    
863
                quantity = pv.quantity
864
                release = 0
865
                if quantity < 0:
866
                    release = 1
867

    
868
                if release:
869
                    h.returned -= quantity
870
                    th.released -= quantity
871
                else:
872
                    h.exported += quantity
873
                    th.imported += quantity
874

    
875
                reason = 'ACCEPT:' + reason[-121:]
876
                self._log_provision(c, h, th, pv, log_time, reason)
877
                h.save()
878
                th.save()
879
                pv.delete()
880
            c.delete()
881

    
882
        return
883

    
884
    def reject_commission(self,
885
                          context=None, clientkey=None,
886
                          serials=(), reason=''):
887
        log_time = now()
888

    
889
        for serial in serials:
890
            try:
891
                c = db_get_commission(clientkey=clientkey, serial=serial,
892
                                      for_update=True)
893
            except Commission.DoesNotExist:
894
                return
895

    
896
            t = c.entity
897

    
898
            provisions = db_filter_provision(serial=serial, for_update=True)
899
            for pv in provisions:
900
                try:
901
                    h = db_get_holding(entity=pv.entity.entity,
902
                                       resource=pv.resource, for_update=True)
903
                    th = db_get_holding(entity=t, resource=pv.resource,
904
                                        for_update=True)
905
                except Holding.DoesNotExist:
906
                    m = "Corrupted provision"
907
                    raise CorruptedError(m)
908

    
909
                quantity = pv.quantity
910
                release = 0
911
                if quantity < 0:
912
                    release = 1
913

    
914
                if release:
915
                    h.returning += quantity
916
                    th.releasing += quantity
917
                else:
918
                    h.exporting -= quantity
919
                    th.importing -= quantity
920

    
921
                reason = 'REJECT:' + reason[-121:]
922
                self._log_provision(c, h, th, pv, log_time, reason)
923
                h.save()
924
                th.save()
925
                pv.delete()
926
            c.delete()
927

    
928
        return
929

    
930
    def get_pending_commissions(self, context=None, clientkey=None):
931
        pending = Commission.objects.filter(clientkey=clientkey)
932
        pending_list = pending.values_list('serial', flat=True)
933
        return pending_list
934

    
935
    def resolve_pending_commissions(self,
936
                                    context=None, clientkey=None,
937
                                    max_serial=None, accept_set=()):
938
        accept_set = set(accept_set)
939
        pending = self.get_pending_commissions(context=context,
940
                                               clientkey=clientkey)
941
        pending = sorted(pending)
942

    
943
        accept = self.accept_commission
944
        reject = self.reject_commission
945

    
946
        for serial in pending:
947
            if serial > max_serial:
948
                break
949

    
950
            if serial in accept_set:
951
                accept(context=context, clientkey=clientkey, serials=[serial])
952
            else:
953
                reject(context=context, clientkey=clientkey, serials=[serial])
954

    
955
        return
956

    
957
    def release_entity(self, context=None, release_entity=()):
958
        rejected = []
959
        append = rejected.append
960
        for entity, key in release_entity:
961
            try:
962
                e = db_get_entity(entity=entity, key=key, for_update=True)
963
            except Entity.DoesNotExist:
964
                append(entity)
965
                continue
966

    
967
            if e.entities.count() != 0:
968
                append(entity)
969
                continue
970

    
971
            if e.holding_set.count() != 0:
972
                append(entity)
973
                continue
974

    
975
            e.delete()
976

    
977
        if rejected:
978
            raise QuotaholderError(rejected)
979
        return rejected
980

    
981
    def get_timeline(self, context=None, after="", before="Z", get_timeline=()):
982
        entity_set = set()
983
        e_add = entity_set.add
984
        resource_set = set()
985
        r_add = resource_set.add
986

    
987
        for entity, resource, key in get_timeline:
988
            if entity not in entity_set:
989
                try:
990
                    e = Entity.objects.get(entity=entity, key=key)
991
                    e_add(entity)
992
                except Entity.DoesNotExist:
993
                    continue
994

    
995
            r_add((entity, resource))
996

    
997
        chunk_size = 65536
998
        nr = 0
999
        timeline = []
1000
        append = timeline.append
1001
        filterlogs = ProvisionLog.objects.filter
1002
        if entity_set:
1003
            q_entity = Q(source__in=entity_set) | Q(target__in=entity_set)
1004
        else:
1005
            q_entity = Q()
1006

    
1007
        while 1:
1008
            logs = filterlogs(q_entity,
1009
                              issue_time__gt=after,
1010
                              issue_time__lte=before,
1011
                              reason__startswith='ACCEPT:')
1012

    
1013
            logs = logs.order_by('issue_time')
1014
            #logs = logs.values()
1015
            logs = logs[:chunk_size]
1016
            nr += len(logs)
1017
            if not logs:
1018
                break
1019
            for g in logs:
1020
                if ((g.source, g.resource) not in resource_set
1021
                    or (g.target, g.resource) not in resource_set):
1022
                    continue
1023

    
1024
                o = {
1025
                    'serial':                   g.serial,
1026
                    'source':                   g.source,
1027
                    'target':                   g.target,
1028
                    'resource':                 g.resource,
1029
                    'name':                     g.name,
1030
                    'quantity':                 g.delta_quantity,
1031
                    'source_allocated':         g.source_allocated(),
1032
                    'source_allocated_through': g.source_allocated_through(),
1033
                    'source_inbound':           g.source_inbound(),
1034
                    'source_inbound_through':   g.source_inbound_through(),
1035
                    'source_outbound':          g.source_outbound(),
1036
                    'source_outbound_through':  g.source_outbound_through(),
1037
                    'target_allocated':         g.target_allocated(),
1038
                    'target_allocated_through': g.target_allocated_through(),
1039
                    'target_inbound':           g.target_inbound(),
1040
                    'target_inbound_through':   g.target_inbound_through(),
1041
                    'target_outbound':          g.target_outbound(),
1042
                    'target_outbound_through':  g.target_outbound_through(),
1043
                    'issue_time':               g.issue_time,
1044
                    'log_time':                 g.log_time,
1045
                    'reason':                   g.reason,
1046
                }
1047

    
1048
                append(o)
1049

    
1050
            after = g.issue_time
1051
            if after >= before:
1052
                break
1053

    
1054
        return timeline
1055

    
1056

    
1057
def _add(x, y, invert=False):
1058
    return x + y if not invert else x - y
1059

    
1060

    
1061
def _isneg(x):
1062
    return x < 0
1063

    
1064

    
1065
API_Callpoint = QuotaholderDjangoDBCallpoint