Statistics
| Branch: | Tag: | Revision:

root / snf-quotaholder-app / quotaholder_django / quotaholder_app / callpoint.py @ 8acd9708

History | View | Annotate | Download (37.7 kB)

1
# Copyright 2012, 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from synnefo.lib.quotaholder.api import (
35
    QuotaholderAPI,
36
    QH_PRACTICALLY_INFINITE,
37
    InvalidKeyError, NoEntityError,
38
    NoQuantityError, NoCapacityError,
39
    ExportLimitError, ImportLimitError,
40
    DuplicateError)
41

    
42
from synnefo.lib.commissioning import (
43
    Callpoint, CorruptedError, InvalidDataError, ReturnButFail)
44
from synnefo.lib.commissioning.utils.newname import newname
45

    
46
from django.db.models import Q
47
from django.db import transaction
48
from .models import (Entity, Policy, Holding,
49
                     Commission, Provision, ProvisionLog, CallSerial,
50
                     now,
51
                     db_get_entity, db_get_holding, db_get_policy,
52
                     db_get_commission, db_filter_provision, db_get_callserial)
53

    
54

    
55
class QuotaholderDjangoDBCallpoint(Callpoint):
56

    
57
    api_spec = QuotaholderAPI()
58

    
59
    http_exc_lookup = {
60
        CorruptedError:   550,
61
        InvalidDataError: 400,
62
        InvalidKeyError:  401,
63
        NoEntityError:    404,
64
        NoQuantityError:  413,
65
        NoCapacityError:  413,
66
    }
67

    
68
    def init_connection(self, connection):
69
        if connection is not None:
70
            raise ValueError("Cannot specify connection args with %s" %
71
                             type(self).__name__)
72

    
73
    def commit(self):
74
        transaction.commit()
75

    
76
    def rollback(self):
77
        transaction.rollback()
78

    
79
    def do_make_call(self, call_name, data):
80
        call_fn = getattr(self, call_name, None)
81
        if not call_fn:
82
            m = "cannot find call '%s'" % (call_name,)
83
            raise CorruptedError(m)
84

    
85
        return call_fn(**data)
86

    
87
    def create_entity(self, context={}, create_entity=()):
88
        rejected = []
89
        append = rejected.append
90

    
91
        for idx, (entity, owner, key, ownerkey) in enumerate(create_entity):
92
            try:
93
                owner = Entity.objects.get(entity=owner, key=ownerkey)
94
            except Entity.DoesNotExist:
95
                append(idx)
96
                continue
97

    
98
            try:
99
                e = Entity.objects.get(entity=entity)
100
                append(idx)
101
            except Entity.DoesNotExist:
102
                e = Entity.objects.create(entity=entity,
103
                                          owner=owner,
104
                                          key=key)
105

    
106
        if rejected:
107
            raise ReturnButFail(rejected)
108
        return rejected
109

    
110
    def set_entity_key(self, context={}, set_entity_key=()):
111
        rejected = []
112
        append = rejected.append
113

    
114
        for entity, key, newkey in set_entity_key:
115
            try:
116
                e = db_get_entity(entity=entity, key=key, for_update=True)
117
            except Entity.DoesNotExist:
118
                append(entity)
119
                continue
120

    
121
            e.key = newkey
122
            e.save()
123

    
124
        if rejected:
125
            raise ReturnButFail(rejected)
126
        return rejected
127

    
128
    def list_entities(self, context={}, entity=None, key=None):
129
        try:
130
            e = Entity.objects.get(entity=entity, key=key)
131
        except Entity.DoesNotExist:
132
            m = "Entity '%s' does not exist" % (entity,)
133
            raise NoEntityError(m)
134

    
135
        children = e.entities.all()
136
        entities = [e.entity for e in children]
137
        return entities
138

    
139
    def get_entity(self, context={}, get_entity=()):
140
        entities = []
141
        append = entities.append
142

    
143
        names = [entity for entity, key in get_entity]
144
        es = Entity.objects.select_related(depth=1).filter(entity__in=names)
145
        data = {}
146
        for e in es:
147
            data[e.entity] = e
148

    
149
        for entity, key in get_entity:
150
            e = data.get(entity, None)
151
            if e is None or e.key != key:
152
                continue
153
            append((entity, e.owner.entity))
154

    
155
        return entities
156

    
157
    def get_limits(self, context={}, get_limits=()):
158
        limits = []
159
        append = limits.append
160

    
161
        for policy in get_limits:
162
            try:
163
                p = Policy.objects.get(policy=policy)
164
            except Policy.DoesNotExist:
165
                continue
166

    
167
            append((policy, p.quantity, p.capacity,
168
                    p.import_limit, p.export_limit))
169

    
170
        return limits
171

    
172
    def set_limits(self, context={}, set_limits=()):
173

    
174
        for (policy, quantity, capacity,
175
             import_limit, export_limit) in set_limits:
176

    
177
            try:
178
                policy = db_get_policy(policy=policy, for_update=True)
179
            except Policy.DoesNotExist:
180
                Policy.objects.create(policy=policy,
181
                                      quantity=quantity,
182
                                      capacity=capacity,
183
                                      import_limit=import_limit,
184
                                      export_limit=export_limit)
185
            else:
186
                policy.quantity = quantity
187
                policy.capacity = capacity
188
                policy.export_limit = export_limit
189
                policy.import_limit = import_limit
190
                policy.save()
191

    
192
        return ()
193

    
194
    def get_holding(self, context={}, get_holding=()):
195
        holdings = []
196
        append = holdings.append
197

    
198
        for entity, resource, key in get_holding:
199
            try:
200
                h = Holding.objects.get(entity=entity, resource=resource)
201
            except Holding.DoesNotExist:
202
                continue
203

    
204
            if h.entity.key != key:
205
                continue
206

    
207
            append((h.entity.entity, h.resource, h.policy.policy,
208
                    h.imported, h.exported,
209
                    h.returned, h.released, h.flags))
210

    
211
        return holdings
212

    
213
    def _set_holding(self, entity, resource, policy, flags):
214
        try:
215
            h = db_get_holding(entity=entity, resource=resource,
216
                               for_update=True)
217
            h.policy = p
218
            h.flags = flags
219
            h.save()
220
        except Holding.DoesNotExist:
221
            h = Holding.objects.create(entity=e, resource=resource,
222
                                       policy=p, flags=flags)
223
        return h
224

    
225
    def set_holding(self, context={}, set_holding=()):
226
        rejected = []
227
        append = rejected.append
228

    
229
        for entity, resource, key, policy, flags in set_holding:
230
            try:
231
                e = Entity.objects.get(entity=entity, key=key)
232
            except Entity.DoesNotExist:
233
                append((entity, resource, policy))
234
                continue
235

    
236
            if e.key != key:
237
                append((entity, resource, policy))
238
                continue
239

    
240
            try:
241
                p = Policy.objects.get(policy=policy)
242
            except Policy.DoesNotExist:
243
                append((entity, resource, policy))
244
                continue
245

    
246
            try:
247
                h = db_get_holding(entity=entity, resource=resource,
248
                                   for_update=True)
249
                h.policy = p
250
                h.flags = flags
251
                h.save()
252
            except Holding.DoesNotExist:
253
                h = Holding.objects.create(entity=e, resource=resource,
254
                                           policy=p, flags=flags)
255

    
256
        if rejected:
257
            raise ReturnButFail(rejected)
258
        return rejected
259

    
260
    def _init_holding(self,
261
                      entity, resource, policy,
262
                      imported, exported, returned, released,
263
                      flags):
264
        try:
265
            h = db_get_holding(entity=entity, resource=resource,
266
                               for_update=True)
267
        except Holding.DoesNotExist:
268
            h = Holding(entity=entity, resource=resource)
269

    
270
        h.policy = policy
271
        h.flags = flags
272
        h.imported = imported
273
        h.importing = imported
274
        h.exported = exported
275
        h.exporting = exported
276
        h.returned = returned
277
        h.returning = returned
278
        h.released = released
279
        h.releasing = released
280
        h.save()
281

    
282
    def init_holding(self, context={}, init_holding=()):
283
        rejected = []
284
        append = rejected.append
285

    
286
        for idx, sfh in enumerate(init_holding):
287
            (entity, resource, key, policy,
288
             imported, exported, returned, released,
289
             flags) = sfh
290
            try:
291
                e = Entity.objects.get(entity=entity, key=key)
292
            except Entity.DoesNotExist:
293
                append(idx)
294
                continue
295

    
296
            if e.key != key:
297
                append(idx)
298
                continue
299

    
300
            try:
301
                p = Policy.objects.get(policy=policy)
302
            except Policy.DoesNotExist:
303
                append(idx)
304
                continue
305

    
306
            self._init_holding(e, resource, p,
307
                               imported, exported,
308
                               returned, released,
309
                               flags)
310
        if rejected:
311
            raise ReturnButFail(rejected)
312
        return rejected
313

    
314
    def reset_holding(self, context={}, reset_holding=()):
315
        rejected = []
316
        append = rejected.append
317

    
318
        for idx, tpl in enumerate(reset_holding):
319
            (entity, resource, key,
320
             imported, exported, returned, released) = tpl
321
            try:
322
                e = Entity.objects.get(entity=entity, key=key)
323
            except Entity.DoesNotExist:
324
                append(idx)
325
                continue
326

    
327
            try:
328
                h = db_get_holding(entity=entity, resource=resource,
329
                                   for_update=True)
330
                h.imported = imported
331
                h.importing = imported
332
                h.exported = exported
333
                h.exporting = exported
334
                h.returned = returned
335
                h.returning = returned
336
                h.released = released
337
                h.releasing = released
338
                h.save()
339
            except Holding.DoesNotExist:
340
                append(idx)
341
                continue
342

    
343
        if rejected:
344
            raise ReturnButFail(rejected)
345
        return rejected
346

    
347
    def _check_pending(self, entity, resource):
348
        cs = Commission.objects.filter(entity=entity)
349
        cs = [c for c in cs if c.provisions.filter(resource=resource)]
350
        as_target = [c.serial for c in cs]
351

    
352
        ps = Provision.objects.filter(entity=entity, resource=resource)
353
        as_source = [p.serial.serial for p in ps]
354

    
355
        return as_target + as_source
356

    
357
    def _actual_quantity(self, holding):
358
        hp = holding.policy
359
        return hp.quantity + (holding.imported + holding.returned -
360
                              holding.exported - holding.released)
361

    
362
    def _new_policy_name(self):
363
        return newname('policy_')
364

    
365
    def _increase_resource(self, entity, resource, amount):
366
        try:
367
            h = db_get_holding(entity=entity, resource=resource,
368
                               for_update=True)
369
        except Holding.DoesNotExist:
370
            h = Holding(entity=entity, resource=resource)
371
            p = Policy.objects.create(policy=self._new_policy_name(),
372
                                      quantity=0,
373
                                      capacity=QH_PRACTICALLY_INFINITE,
374
                                      import_limit=QH_PRACTICALLY_INFINITE,
375
                                      export_limit=QH_PRACTICALLY_INFINITE)
376
            h.policy = p
377
        h.imported += amount
378
        h.save()
379

    
380
    def release_holding(self, context={}, release_holding=()):
381
        rejected = []
382
        append = rejected.append
383

    
384
        for idx, (entity, resource, key) in enumerate(release_holding):
385
            try:
386
                h = db_get_holding(entity=entity, resource=resource,
387
                                   for_update=True)
388
            except Holding.DoesNotExist:
389
                append(idx)
390
                continue
391

    
392
            if h.entity.key != key:
393
                append(idx)
394
                continue
395

    
396
            if self._check_pending(entity, resource):
397
                append(idx)
398
                continue
399

    
400
            q = self._actual_quantity(h)
401
            if q > 0:
402
                owner = h.entity.owner
403
                self._increase_resource(owner, resource, q)
404

    
405
            h.delete()
406

    
407
        if rejected:
408
            raise ReturnButFail(rejected)
409
        return rejected
410

    
411
    def list_resources(self, context={}, entity=None, key=None):
412
        try:
413
            e = Entity.objects.get(entity=entity)
414
        except Entity.DoesNotExist:
415
            m = "No such entity '%s'" % (entity,)
416
            raise NoEntityError(m)
417

    
418
        if e.key != key:
419
            m = "Invalid key for entity '%s'" % (entity,)
420
            raise InvalidKeyError(m)
421

    
422
        holdings = e.holding_set.filter(entity=entity)
423
        resources = [h.resource for h in holdings]
424
        return resources
425

    
426
    def list_holdings(self, context={}, list_holdings=()):
427
        rejected = []
428
        reject = rejected.append
429
        holdings_list = []
430
        append = holdings_list.append
431

    
432
        for entity, key in list_holdings:
433
            try:
434
                e = Entity.objects.get(entity=entity)
435
                if e.key != key:
436
                    raise Entity.DoesNotExist("wrong key")
437
            except Entity.DoesNotExist:
438
                reject(entity)
439
                continue
440

    
441
            holdings = e.holding_set.filter(entity=entity)
442
            append([[entity, h.resource,
443
                     h.imported, h.exported, h.returned, h.released]
444
                    for h in holdings])
445

    
446
        return holdings_list, rejected
447

    
448
    def get_quota(self, context={}, get_quota=()):
449
        quotas = []
450
        append = quotas.append
451

    
452
        entities = set(e for e, r, k in get_quota)
453
        hs = Holding.objects.select_related().filter(entity__in=entities)
454
        holdings = {}
455
        for h in hs:
456
            holdings[(h.entity_id, h.resource)] = h
457

    
458
        for entity, resource, key in get_quota:
459
            try:
460
                h = holdings[(entity, resource)]
461
            except:
462
                continue
463

    
464
            if h.entity.key != key:
465
                continue
466

    
467
            p = h.policy
468

    
469
            append((h.entity.entity, h.resource, p.quantity, p.capacity,
470
                    p.import_limit, p.export_limit,
471
                    h.imported, h.exported,
472
                    h.returned, h.released,
473
                    h.flags))
474

    
475
        return quotas
476

    
477
    def set_quota(self, context={}, set_quota=()):
478
        rejected = []
479
        append = rejected.append
480

    
481
        for (entity, resource, key,
482
             quantity, capacity,
483
             import_limit, export_limit, flags) in set_quota:
484

    
485
            try:
486
                e = Entity.objects.get(entity=entity, key=key)
487
            except Entity.DoesNotExist:
488
                append((entity, resource))
489
                continue
490

    
491
            policy = newname('policy_')
492
            newp = Policy(policy=policy,
493
                          quantity=quantity,
494
                          capacity=capacity,
495
                          import_limit=import_limit,
496
                          export_limit=export_limit)
497

    
498
            try:
499
                h = db_get_holding(entity=entity, resource=resource,
500
                                   for_update=True)
501
                p = h.policy
502
                h.policy = newp
503
                h.flags = flags
504
            except Holding.DoesNotExist:
505
                h = Holding(entity=e, resource=resource,
506
                            policy=newp, flags=flags)
507
                p = None
508

    
509
            # the order is intentionally reversed so that it
510
            # would break if we are not within a transaction.
511
            # Has helped before.
512
            h.save()
513
            newp.save()
514

    
515
            if p is not None and p.holding_set.count() == 0:
516
                p.delete()
517

    
518
        if rejected:
519
            raise ReturnButFail(rejected)
520
        return rejected
521

    
522
    def add_quota(self,
523
                  context={}, clientkey=None, serial=None,
524
                  sub_quota=(), add_quota=()):
525
        rejected = []
526
        append = rejected.append
527

    
528
        if serial is not None:
529
            if clientkey is None:
530
                all_pairs = [(q[0], q[1]) for q in sub_quota + add_quota]
531
                raise ReturnButFail(all_pairs)
532
            try:
533
                cs = CallSerial.objects.get(serial=serial, clientkey=clientkey)
534
                all_pairs = [(q[0], q[1]) for q in sub_quota + add_quota]
535
                raise ReturnButFail(all_pairs)
536
            except CallSerial.DoesNotExist:
537
                pass
538

    
539
        for removing, source in [(True, sub_quota), (False, add_quota)]:
540
            for (entity, resource, key,
541
                 quantity, capacity,
542
                 import_limit, export_limit) in source:
543

    
544
                try:
545
                    e = Entity.objects.get(entity=entity, key=key)
546
                except Entity.DoesNotExist:
547
                    append((entity, resource))
548
                    continue
549

    
550
                try:
551
                    h = db_get_holding(entity=entity, resource=resource,
552
                                       for_update=True)
553
                    p = h.policy
554
                except Holding.DoesNotExist:
555
                    if removing:
556
                        append((entity, resource))
557
                        continue
558
                    h = Holding(entity=e, resource=resource, flags=0)
559
                    p = None
560

    
561
                policy = newname('policy_')
562
                newp = Policy(policy=policy)
563

    
564
                newp.quantity = _add(p.quantity if p else 0, quantity,
565
                                     invert=removing)
566
                newp.capacity = _add(p.capacity if p else 0, capacity,
567
                                     invert=removing)
568
                newp.import_limit = _add(p.import_limit if p else 0,
569
                                         import_limit, invert=removing)
570
                newp.export_limit = _add(p.export_limit if p else 0,
571
                                         export_limit, invert=removing)
572

    
573
                new_values = [newp.capacity,
574
                              newp.import_limit, newp.export_limit]
575
                if any(map(_isneg, new_values)):
576
                    append((entity, resource))
577
                    continue
578

    
579
                h.policy = newp
580

    
581
                # the order is intentionally reversed so that it
582
                # would break if we are not within a transaction.
583
                # Has helped before.
584
                h.save()
585
                newp.save()
586

    
587
                if p is not None and p.holding_set.count() == 0:
588
                    p.delete()
589

    
590
        if rejected:
591
            raise ReturnButFail(rejected)
592

    
593
        if serial is not None and clientkey is not None:
594
            CallSerial.objects.create(serial=serial, clientkey=clientkey)
595
        return rejected
596

    
597
    def ack_serials(self, context={}, clientkey=None, serials=()):
598
        if clientkey is None:
599
            return
600

    
601
        for serial in serials:
602
            try:
603
                c = db_get_callserial(clientkey=clientkey,
604
                                      serial=serial,
605
                                      for_update=True)
606
                c.delete()
607
            except CallSerial.DoesNotExist:
608
                pass
609
        return
610

    
611
    def query_serials(self, context={}, clientkey=None, serials=()):
612
        result = []
613
        append = result.append
614

    
615
        if clientkey is None:
616
            return result
617

    
618
        if not serials:
619
            cs = CallSerial.objects.filter(clientkey=clientkey)
620
            return [c.serial for c in cs]
621

    
622
        for serial in serials:
623
            try:
624
                db_get_callserial(clientkey=clientkey, serial=serial)
625
                append(serial)
626
            except CallSerial.DoesNotExist:
627
                pass
628

    
629
        return result
630

    
631
    def issue_commission(self,
632
                         context={},
633
                         clientkey=None,
634
                         target=None,
635
                         key=None,
636
                         name=None,
637
                         provisions=()):
638

    
639
        try:
640
            t = Entity.objects.get(entity=target)
641
        except Entity.DoesNotExist:
642
            m = "No target entity '%s'" % (target,)
643
            raise NoEntityError(m)
644
        else:
645
            if t.key != key:
646
                m = "Invalid key for target entity '%s'" % (target,)
647
                raise InvalidKeyError(m)
648

    
649
        create = Commission.objects.create
650
        commission = create(entity_id=target, clientkey=clientkey, name=name)
651
        serial = commission.serial
652

    
653
        checked = []
654
        for entity, resource, quantity in provisions:
655

    
656
            if entity == target:
657
                m = "Cannot issue commission from an entity to itself (%s)" % (
658
                    entity,)
659
                raise InvalidDataError(m)
660

    
661
            ent_res = entity, resource
662
            if ent_res in checked:
663
                m = "Duplicate provision for %s.%s" % ent_res
664
                raise DuplicateError(m)
665
            checked.append(ent_res)
666

    
667
            try:
668
                e = Entity.objects.get(entity=entity)
669
            except Entity.DoesNotExist:
670
                m = "No source entity '%s'" % (entity,)
671
                raise NoEntityError(m)
672

    
673
            release = 0
674
            if quantity < 0:
675
                release = 1
676

    
677
            # Source limits checks
678
            try:
679
                h = db_get_holding(entity=entity, resource=resource,
680
                                   for_update=True)
681
            except Holding.DoesNotExist:
682
                m = ("There is no quantity "
683
                     "to allocate from in %s.%s" % (entity, resource))
684
                raise NoQuantityError(m,
685
                                      source=entity, target=target,
686
                                      resource=resource, requested=quantity,
687
                                      current=0, limit=0)
688

    
689
            hp = h.policy
690

    
691
            if not release:
692
                current = h.exporting
693
                limit = hp.export_limit
694
                if current + quantity > limit:
695
                    m = ("Export limit reached for %s.%s" % (entity, resource))
696
                    raise ExportLimitError(m,
697
                                           source=entity,
698
                                           target=target,
699
                                           resource=resource,
700
                                           requested=quantity,
701
                                           current=current,
702
                                           limit=limit)
703

    
704
                limit = hp.quantity + h.imported - h.releasing
705
                unavailable = h.exporting - h.returned
706
                available = limit - unavailable
707

    
708
                if quantity > available:
709
                    m = ("There is not enough quantity "
710
                         "to allocate from in %s.%s" % (entity, resource))
711
                    raise NoQuantityError(m,
712
                                          source=entity,
713
                                          target=target,
714
                                          resource=resource,
715
                                          requested=quantity,
716
                                          current=unavailable,
717
                                          limit=limit)
718
            else:
719
                current = (+ h.importing + h.returning
720
                           - h.exported - h.returned)
721
                limit = hp.capacity
722
                if current - quantity > limit:
723
                    m = ("There is not enough capacity "
724
                         "to release to in %s.%s" % (entity, resource))
725
                    raise NoQuantityError(m,
726
                                          source=entity,
727
                                          target=target,
728
                                          resource=resource,
729
                                          requested=quantity,
730
                                          current=current,
731
                                          limit=limit)
732

    
733
            # Target limits checks
734
            try:
735
                th = db_get_holding(entity=target, resource=resource,
736
                                    for_update=True)
737
            except Holding.DoesNotExist:
738
                m = ("There is no capacity "
739
                     "to allocate into in %s.%s" % (target, resource))
740
                raise NoCapacityError(m,
741
                                      source=entity,
742
                                      target=target,
743
                                      resource=resource,
744
                                      requested=quantity,
745
                                      current=0,
746
                                      limit=0)
747

    
748
            tp = th.policy
749

    
750
            if not release:
751
                limit = tp.import_limit
752
                current = th.importing
753
                if current + quantity > limit:
754
                    m = ("Import limit reached for %s.%s" % (target, resource))
755
                    raise ImportLimitError(m,
756
                                           source=entity,
757
                                           target=target,
758
                                           resource=resource,
759
                                           requested=quantity,
760
                                           current=current,
761
                                           limit=limit)
762

    
763
                limit = tp.quantity + tp.capacity
764
                current = (+ th.importing + th.returning + tp.quantity
765
                           - th.exported - th.released)
766

    
767
                if current + quantity > limit:
768
                    m = ("There is not enough capacity "
769
                         "to allocate into in %s.%s" % (target, resource))
770
                    raise NoCapacityError(m,
771
                                          source=entity,
772
                                          target=target,
773
                                          resource=resource,
774
                                          requested=quantity,
775
                                          current=current,
776
                                          limit=limit)
777
            else:
778
                limit = tp.quantity + th.imported - th.releasing
779
                unavailable = th.exporting - th.returned
780
                available = limit - unavailable
781

    
782
                if available + quantity < 0:
783
                    m = ("There is not enough quantity "
784
                         "to release from in %s.%s" % (target, resource))
785
                    raise NoCapacityError(m,
786
                                          source=entity,
787
                                          target=target,
788
                                          resource=resource,
789
                                          requested=quantity,
790
                                          current=unavailable,
791
                                          limit=limit)
792

    
793
            Provision.objects.create(serial=commission,
794
                                     entity=e,
795
                                     resource=resource,
796
                                     quantity=quantity)
797
            if release:
798
                h.returning -= quantity
799
                th.releasing -= quantity
800
            else:
801
                h.exporting += quantity
802
                th.importing += quantity
803

    
804
            h.save()
805
            th.save()
806

    
807
        return serial
808

    
809
    def _log_provision(self,
810
                       commission, s_holding, t_holding,
811
                       provision, log_time, reason):
812

    
813
        s_entity = s_holding.entity
814
        s_policy = s_holding.policy
815
        t_entity = t_holding.entity
816
        t_policy = t_holding.policy
817

    
818
        kwargs = {
819
            'serial':              commission.serial,
820
            'name':                commission.name,
821
            'source':              s_entity.entity,
822
            'target':              t_entity.entity,
823
            'resource':            provision.resource,
824
            'source_quantity':     s_policy.quantity,
825
            'source_capacity':     s_policy.capacity,
826
            'source_import_limit': s_policy.import_limit,
827
            'source_export_limit': s_policy.export_limit,
828
            'source_imported':     s_holding.imported,
829
            'source_exported':     s_holding.exported,
830
            'source_returned':     s_holding.returned,
831
            'source_released':     s_holding.released,
832
            'target_quantity':     t_policy.quantity,
833
            'target_capacity':     t_policy.capacity,
834
            'target_import_limit': t_policy.import_limit,
835
            'target_export_limit': t_policy.export_limit,
836
            'target_imported':     t_holding.imported,
837
            'target_exported':     t_holding.exported,
838
            'target_returned':     t_holding.returned,
839
            'target_released':     t_holding.released,
840
            'delta_quantity':      provision.quantity,
841
            'issue_time':          commission.issue_time,
842
            'log_time':            log_time,
843
            'reason':              reason,
844
        }
845

    
846
        ProvisionLog.objects.create(**kwargs)
847

    
848
    def accept_commission(self,
849
                          context={}, clientkey=None,
850
                          serials=(), reason=''):
851
        log_time = now()
852

    
853
        for serial in serials:
854
            try:
855
                c = db_get_commission(clientkey=clientkey, serial=serial,
856
                                      for_update=True)
857
            except Commission.DoesNotExist:
858
                return
859

    
860
            t = c.entity
861

    
862
            provisions = db_filter_provision(serial=serial, for_update=True)
863
            for pv in provisions:
864
                try:
865
                    h = db_get_holding(entity=pv.entity.entity,
866
                                       resource=pv.resource, for_update=True)
867
                    th = db_get_holding(entity=t, resource=pv.resource,
868
                                        for_update=True)
869
                except Holding.DoesNotExist:
870
                    m = "Corrupted provision"
871
                    raise CorruptedError(m)
872

    
873
                quantity = pv.quantity
874
                release = 0
875
                if quantity < 0:
876
                    release = 1
877

    
878
                if release:
879
                    h.returned -= quantity
880
                    th.released -= quantity
881
                else:
882
                    h.exported += quantity
883
                    th.imported += quantity
884

    
885
                reason = 'ACCEPT:' + reason[-121:]
886
                self._log_provision(c, h, th, pv, log_time, reason)
887
                h.save()
888
                th.save()
889
                pv.delete()
890
            c.delete()
891

    
892
        return
893

    
894
    def reject_commission(self,
895
                          context={}, clientkey=None,
896
                          serials=(), reason=''):
897
        log_time = now()
898

    
899
        for serial in serials:
900
            try:
901
                c = db_get_commission(clientkey=clientkey, serial=serial,
902
                                      for_update=True)
903
            except Commission.DoesNotExist:
904
                return
905

    
906
            t = c.entity
907

    
908
            provisions = db_filter_provision(serial=serial, for_update=True)
909
            for pv in provisions:
910
                try:
911
                    h = db_get_holding(entity=pv.entity.entity,
912
                                       resource=pv.resource, for_update=True)
913
                    th = db_get_holding(entity=t, resource=pv.resource,
914
                                        for_update=True)
915
                except Holding.DoesNotExist:
916
                    m = "Corrupted provision"
917
                    raise CorruptedError(m)
918

    
919
                quantity = pv.quantity
920
                release = 0
921
                if quantity < 0:
922
                    release = 1
923

    
924
                if release:
925
                    h.returning += quantity
926
                    th.releasing += quantity
927
                else:
928
                    h.exporting -= quantity
929
                    th.importing -= quantity
930

    
931
                reason = 'REJECT:' + reason[-121:]
932
                self._log_provision(c, h, th, pv, log_time, reason)
933
                h.save()
934
                th.save()
935
                pv.delete()
936
            c.delete()
937

    
938
        return
939

    
940
    def get_pending_commissions(self, context={}, clientkey=None):
941
        pending = Commission.objects.filter(clientkey=clientkey)
942
        pending_list = pending.values_list('serial', flat=True)
943
        return pending_list
944

    
945
    def resolve_pending_commissions(self,
946
                                    context={}, clientkey=None,
947
                                    max_serial=None, accept_set=()):
948
        accept_set = set(accept_set)
949
        pending = self.get_pending_commissions(context=context,
950
                                               clientkey=clientkey)
951
        pending = sorted(pending)
952

    
953
        accept = self.accept_commission
954
        reject = self.reject_commission
955

    
956
        for serial in pending:
957
            if serial > max_serial:
958
                break
959

    
960
            if serial in accept_set:
961
                accept(context=context, clientkey=clientkey, serials=[serial])
962
            else:
963
                reject(context=context, clientkey=clientkey, serials=[serial])
964

    
965
        return
966

    
967
    def release_entity(self, context={}, release_entity=()):
968
        rejected = []
969
        append = rejected.append
970
        for entity, key in release_entity:
971
            try:
972
                e = db_get_entity(entity=entity, key=key, for_update=True)
973
            except Entity.DoesNotExist:
974
                append(entity)
975
                continue
976

    
977
            if e.entities.count() != 0:
978
                append(entity)
979
                continue
980

    
981
            if e.holding_set.count() != 0:
982
                append(entity)
983
                continue
984

    
985
            e.delete()
986

    
987
        if rejected:
988
            raise ReturnButFail(rejected)
989
        return rejected
990

    
991
    def get_timeline(self, context={}, after="", before="Z", get_timeline=()):
992
        entity_set = set()
993
        e_add = entity_set.add
994
        resource_set = set()
995
        r_add = resource_set.add
996

    
997
        for entity, resource, key in get_timeline:
998
            if entity not in entity_set:
999
                try:
1000
                    e = Entity.objects.get(entity=entity, key=key)
1001
                    e_add(entity)
1002
                except Entity.DoesNotExist:
1003
                    continue
1004

    
1005
            r_add((entity, resource))
1006

    
1007
        chunk_size = 65536
1008
        nr = 0
1009
        timeline = []
1010
        append = timeline.append
1011
        filterlogs = ProvisionLog.objects.filter
1012
        if entity_set:
1013
            q_entity = Q(source__in=entity_set) | Q(target__in=entity_set)
1014
        else:
1015
            q_entity = Q()
1016

    
1017
        while 1:
1018
            logs = filterlogs(q_entity,
1019
                              issue_time__gt=after,
1020
                              issue_time__lte=before,
1021
                              reason__startswith='ACCEPT:')
1022

    
1023
            logs = logs.order_by('issue_time')
1024
            #logs = logs.values()
1025
            logs = logs[:chunk_size]
1026
            nr += len(logs)
1027
            if not logs:
1028
                break
1029
            for g in logs:
1030
                if ((g.source, g.resource) not in resource_set
1031
                    or (g.target, g.resource) not in resource_set):
1032
                    continue
1033

    
1034
                o = {
1035
                    'serial':                   g.serial,
1036
                    'source':                   g.source,
1037
                    'target':                   g.target,
1038
                    'resource':                 g.resource,
1039
                    'name':                     g.name,
1040
                    'quantity':                 g.delta_quantity,
1041
                    'source_allocated':         g.source_allocated(),
1042
                    'source_allocated_through': g.source_allocated_through(),
1043
                    'source_inbound':           g.source_inbound(),
1044
                    'source_inbound_through':   g.source_inbound_through(),
1045
                    'source_outbound':          g.source_outbound(),
1046
                    'source_outbound_through':  g.source_outbound_through(),
1047
                    'target_allocated':         g.target_allocated(),
1048
                    'target_allocated_through': g.target_allocated_through(),
1049
                    'target_inbound':           g.target_inbound(),
1050
                    'target_inbound_through':   g.target_inbound_through(),
1051
                    'target_outbound':          g.target_outbound(),
1052
                    'target_outbound_through':  g.target_outbound_through(),
1053
                    'issue_time':               g.issue_time,
1054
                    'log_time':                 g.log_time,
1055
                    'reason':                   g.reason,
1056
                }
1057

    
1058
                append(o)
1059

    
1060
            after = g.issue_time
1061
            if after >= before:
1062
                break
1063

    
1064
        return timeline
1065

    
1066

    
1067
def _add(x, y, invert=False):
1068
    return x + y if not invert else x - y
1069

    
1070

    
1071
def _update(dest, source, attr, delta):
1072
    dest_attr = getattr(dest, attr)
1073
    dest_attr = _add(getattr(source, attr, 0), delta)
1074

    
1075

    
1076
def _isneg(x):
1077
    return x < 0
1078

    
1079

    
1080
API_Callpoint = QuotaholderDjangoDBCallpoint