Statistics
| Branch: | Tag: | Revision:

root / snf-quotaholder-app / quotaholder_django / quotaholder_app / callpoint.py @ 6e4e100b

History | View | Annotate | Download (37.4 kB)

1
# Copyright 2012, 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from synnefo.lib.quotaholder.api import (
35
    QuotaholderAPI,
36
    QH_PRACTICALLY_INFINITE,
37
    InvalidKeyError, NoEntityError,
38
    NoQuantityError, NoCapacityError,
39
    ExportLimitError, ImportLimitError,
40
    DuplicateError)
41

    
42
from synnefo.lib.commissioning import (
43
    Callpoint, CorruptedError, InvalidDataError, ReturnButFail)
44
from synnefo.lib.commissioning.utils.newname import newname
45

    
46
from django.db.models import Q
47
from django.db import transaction
48
from .models import (Entity, Policy, Holding,
49
                     Commission, Provision, ProvisionLog, CallSerial,
50
                     now,
51
                     db_get_entity, db_get_holding, db_get_policy,
52
                     db_get_commission, db_filter_provision, db_get_callserial)
53

    
54

    
55
class QuotaholderDjangoDBCallpoint(Callpoint):
56

    
57
    api_spec = QuotaholderAPI()
58

    
59
    http_exc_lookup = {
60
        CorruptedError:   550,
61
        InvalidDataError: 400,
62
        InvalidKeyError:  401,
63
        NoEntityError:    404,
64
        NoQuantityError:  413,
65
        NoCapacityError:  413,
66
    }
67

    
68
    def init_connection(self, connection):
69
        if connection is not None:
70
            raise ValueError("Cannot specify connection args with %s" %
71
                             type(self).__name__)
72

    
73
    def commit(self):
74
        transaction.commit()
75

    
76
    def rollback(self):
77
        transaction.rollback()
78

    
79
    def do_make_call(self, call_name, data):
80
        call_fn = getattr(self, call_name, None)
81
        if not call_fn:
82
            m = "cannot find call '%s'" % (call_name,)
83
            raise CorruptedError(m)
84

    
85
        return call_fn(**data)
86

    
87
    def create_entity(self, context={}, create_entity=()):
88
        rejected = []
89
        append = rejected.append
90

    
91
        for idx, (entity, owner, key, ownerkey) in enumerate(create_entity):
92
            try:
93
                owner = Entity.objects.get(entity=owner, key=ownerkey)
94
            except Entity.DoesNotExist:
95
                append(idx)
96
                continue
97

    
98
            try:
99
                e = Entity.objects.get(entity=entity)
100
                append(idx)
101
            except Entity.DoesNotExist:
102
                e = Entity.objects.create(entity=entity,
103
                                          owner=owner,
104
                                          key=key)
105

    
106
        if rejected:
107
            raise ReturnButFail(rejected)
108
        return rejected
109

    
110
    def set_entity_key(self, context={}, set_entity_key=()):
111
        rejected = []
112
        append = rejected.append
113

    
114
        for entity, key, newkey in set_entity_key:
115
            try:
116
                e = db_get_entity(entity=entity, key=key, for_update=True)
117
            except Entity.DoesNotExist:
118
                append(entity)
119
                continue
120

    
121
            e.key = newkey
122
            e.save()
123

    
124
        if rejected:
125
            raise ReturnButFail(rejected)
126
        return rejected
127

    
128
    def list_entities(self, context={}, entity=None, key=None):
129
        try:
130
            e = Entity.objects.get(entity=entity, key=key)
131
        except Entity.DoesNotExist:
132
            m = "Entity '%s' does not exist" % (entity,)
133
            raise NoEntityError(m)
134

    
135
        children = e.entities.all()
136
        entities = [e.entity for e in children]
137
        return entities
138

    
139
    def get_entity(self, context={}, get_entity=()):
140
        entities = []
141
        append = entities.append
142

    
143
        for entity, key in get_entity:
144
            try:
145
                e = Entity.objects.get(entity=entity, key=key)
146
            except Entity.DoesNotExist:
147
                continue
148

    
149
            append((entity, e.owner.entity))
150

    
151
        return entities
152

    
153
    def get_limits(self, context={}, get_limits=()):
154
        limits = []
155
        append = limits.append
156

    
157
        for policy in get_limits:
158
            try:
159
                p = Policy.objects.get(policy=policy)
160
            except Policy.DoesNotExist:
161
                continue
162

    
163
            append((policy, p.quantity, p.capacity,
164
                    p.import_limit, p.export_limit))
165

    
166
        return limits
167

    
168
    def set_limits(self, context={}, set_limits=()):
169

    
170
        for (policy, quantity, capacity,
171
             import_limit, export_limit) in set_limits:
172

    
173
            try:
174
                policy = db_get_policy(policy=policy, for_update=True)
175
            except Policy.DoesNotExist:
176
                Policy.objects.create(policy=policy,
177
                                      quantity=quantity,
178
                                      capacity=capacity,
179
                                      import_limit=import_limit,
180
                                      export_limit=export_limit)
181
            else:
182
                policy.quantity = quantity
183
                policy.capacity = capacity
184
                policy.export_limit = export_limit
185
                policy.import_limit = import_limit
186
                policy.save()
187

    
188
        return ()
189

    
190
    def get_holding(self, context={}, get_holding=()):
191
        holdings = []
192
        append = holdings.append
193

    
194
        for entity, resource, key in get_holding:
195
            try:
196
                h = Holding.objects.get(entity=entity, resource=resource)
197
            except Holding.DoesNotExist:
198
                continue
199

    
200
            if h.entity.key != key:
201
                continue
202

    
203
            append((h.entity.entity, h.resource, h.policy.policy,
204
                    h.imported, h.exported,
205
                    h.returned, h.released, h.flags))
206

    
207
        return holdings
208

    
209
    def _set_holding(self, entity, resource, policy, flags):
210
        try:
211
            h = db_get_holding(entity=entity, resource=resource,
212
                               for_update=True)
213
            h.policy = p
214
            h.flags = flags
215
            h.save()
216
        except Holding.DoesNotExist:
217
            h = Holding.objects.create(entity=e, resource=resource,
218
                                       policy=p, flags=flags)
219
        return h
220

    
221
    def set_holding(self, context={}, set_holding=()):
222
        rejected = []
223
        append = rejected.append
224

    
225
        for entity, resource, key, policy, flags in set_holding:
226
            try:
227
                e = Entity.objects.get(entity=entity, key=key)
228
            except Entity.DoesNotExist:
229
                append((entity, resource, policy))
230
                continue
231

    
232
            if e.key != key:
233
                append((entity, resource, policy))
234
                continue
235

    
236
            try:
237
                p = Policy.objects.get(policy=policy)
238
            except Policy.DoesNotExist:
239
                append((entity, resource, policy))
240
                continue
241

    
242
            try:
243
                h = db_get_holding(entity=entity, resource=resource,
244
                                   for_update=True)
245
                h.policy = p
246
                h.flags = flags
247
                h.save()
248
            except Holding.DoesNotExist:
249
                h = Holding.objects.create(entity=e, resource=resource,
250
                                           policy=p, flags=flags)
251

    
252
        if rejected:
253
            raise ReturnButFail(rejected)
254
        return rejected
255

    
256
    def _init_holding(self,
257
                      entity, resource, policy,
258
                      imported, exported, returned, released,
259
                      flags):
260
        try:
261
            h = db_get_holding(entity=entity, resource=resource,
262
                               for_update=True)
263
        except Holding.DoesNotExist:
264
            h = Holding(entity=entity, resource=resource)
265

    
266
        h.policy = policy
267
        h.flags = flags
268
        h.imported = imported
269
        h.importing = imported
270
        h.exported = exported
271
        h.exporting = exported
272
        h.returned = returned
273
        h.returning = returned
274
        h.released = released
275
        h.releasing = released
276
        h.save()
277

    
278
    def init_holding(self, context={}, init_holding=()):
279
        rejected = []
280
        append = rejected.append
281

    
282
        for idx, sfh in enumerate(init_holding):
283
            (entity, resource, key, policy,
284
             imported, exported, returned, released,
285
             flags) = sfh
286
            try:
287
                e = Entity.objects.get(entity=entity, key=key)
288
            except Entity.DoesNotExist:
289
                append(idx)
290
                continue
291

    
292
            if e.key != key:
293
                append(idx)
294
                continue
295

    
296
            try:
297
                p = Policy.objects.get(policy=policy)
298
            except Policy.DoesNotExist:
299
                append(idx)
300
                continue
301

    
302
            self._init_holding(e, resource, p,
303
                               imported, exported,
304
                               returned, released,
305
                               flags)
306
        if rejected:
307
            raise ReturnButFail(rejected)
308
        return rejected
309

    
310
    def reset_holding(self, context={}, reset_holding=()):
311
        rejected = []
312
        append = rejected.append
313

    
314
        for idx, tpl in enumerate(reset_holding):
315
            (entity, resource, key,
316
             imported, exported, returned, released) = tpl
317
            try:
318
                e = Entity.objects.get(entity=entity, key=key)
319
            except Entity.DoesNotExist:
320
                append(idx)
321
                continue
322

    
323
            try:
324
                h = db_get_holding(entity=entity, resource=resource,
325
                                   for_update=True)
326
                h.imported = imported
327
                h.importing = imported
328
                h.exported = exported
329
                h.exporting = exported
330
                h.returned = returned
331
                h.returning = returned
332
                h.released = released
333
                h.releasing = released
334
                h.save()
335
            except Holding.DoesNotExist:
336
                append(idx)
337
                continue
338

    
339
        if rejected:
340
            raise ReturnButFail(rejected)
341
        return rejected
342

    
343
    def _check_pending(self, entity, resource):
344
        cs = Commission.objects.filter(entity=entity)
345
        cs = [c for c in cs if c.provisions.filter(resource=resource)]
346
        as_target = [c.serial for c in cs]
347

    
348
        ps = Provision.objects.filter(entity=entity, resource=resource)
349
        as_source = [p.serial.serial for p in ps]
350

    
351
        return as_target + as_source
352

    
353
    def _actual_quantity(self, holding):
354
        hp = holding.policy
355
        return hp.quantity + (holding.imported + holding.returned -
356
                              holding.exported - holding.released)
357

    
358
    def _new_policy_name(self):
359
        return newname('policy_')
360

    
361
    def _increase_resource(self, entity, resource, amount):
362
        try:
363
            h = db_get_holding(entity=entity, resource=resource,
364
                               for_update=True)
365
        except Holding.DoesNotExist:
366
            h = Holding(entity=entity, resource=resource)
367
            p = Policy.objects.create(policy=self._new_policy_name(),
368
                                      quantity=0,
369
                                      capacity=QH_PRACTICALLY_INFINITE,
370
                                      import_limit=QH_PRACTICALLY_INFINITE,
371
                                      export_limit=QH_PRACTICALLY_INFINITE)
372
            h.policy = p
373
        h.imported += amount
374
        h.save()
375

    
376
    def release_holding(self, context={}, release_holding=()):
377
        rejected = []
378
        append = rejected.append
379

    
380
        for idx, (entity, resource, key) in enumerate(release_holding):
381
            try:
382
                h = db_get_holding(entity=entity, resource=resource,
383
                                   for_update=True)
384
            except Holding.DoesNotExist:
385
                append(idx)
386
                continue
387

    
388
            if h.entity.key != key:
389
                append(idx)
390
                continue
391

    
392
            if self._check_pending(entity, resource):
393
                append(idx)
394
                continue
395

    
396
            q = self._actual_quantity(h)
397
            if q > 0:
398
                owner = h.entity.owner
399
                self._increase_resource(owner, resource, q)
400

    
401
            h.delete()
402

    
403
        if rejected:
404
            raise ReturnButFail(rejected)
405
        return rejected
406

    
407
    def list_resources(self, context={}, entity=None, key=None):
408
        try:
409
            e = Entity.objects.get(entity=entity)
410
        except Entity.DoesNotExist:
411
            m = "No such entity '%s'" % (entity,)
412
            raise NoEntityError(m)
413

    
414
        if e.key != key:
415
            m = "Invalid key for entity '%s'" % (entity,)
416
            raise InvalidKeyError(m)
417

    
418
        holdings = e.holding_set.filter(entity=entity)
419
        resources = [h.resource for h in holdings]
420
        return resources
421

    
422
    def list_holdings(self, context={}, list_holdings=()):
423
        rejected = []
424
        reject = rejected.append
425
        holdings_list = []
426
        append = holdings_list.append
427

    
428
        for entity, key in list_holdings:
429
            try:
430
                e = Entity.objects.get(entity=entity)
431
                if e.key != key:
432
                    raise Entity.DoesNotExist("wrong key")
433
            except Entity.DoesNotExist:
434
                reject(entity)
435
                continue
436

    
437
            holdings = e.holding_set.filter(entity=entity)
438
            append([[entity, h.resource,
439
                     h.imported, h.exported, h.returned, h.released]
440
                    for h in holdings])
441

    
442
        return holdings_list, rejected
443

    
444
    def get_quota(self, context={}, get_quota=()):
445
        quotas = []
446
        append = quotas.append
447

    
448
        for entity, resource, key in get_quota:
449
            try:
450
                h = Holding.objects.get(entity=entity, resource=resource)
451
            except Holding.DoesNotExist:
452
                continue
453

    
454
            if h.entity.key != key:
455
                continue
456

    
457
            p = h.policy
458

    
459
            append((h.entity.entity, h.resource, p.quantity, p.capacity,
460
                    p.import_limit, p.export_limit,
461
                    h.imported, h.exported,
462
                    h.returned, h.released,
463
                    h.flags))
464

    
465
        return quotas
466

    
467
    def set_quota(self, context={}, set_quota=()):
468
        rejected = []
469
        append = rejected.append
470

    
471
        for (entity, resource, key,
472
             quantity, capacity,
473
             import_limit, export_limit, flags) in set_quota:
474

    
475
            try:
476
                e = Entity.objects.get(entity=entity, key=key)
477
            except Entity.DoesNotExist:
478
                append((entity, resource))
479
                continue
480

    
481
            policy = newname('policy_')
482
            newp = Policy(policy=policy,
483
                          quantity=quantity,
484
                          capacity=capacity,
485
                          import_limit=import_limit,
486
                          export_limit=export_limit)
487

    
488
            try:
489
                h = db_get_holding(entity=entity, resource=resource,
490
                                   for_update=True)
491
                p = h.policy
492
                h.policy = newp
493
                h.flags = flags
494
            except Holding.DoesNotExist:
495
                h = Holding(entity=e, resource=resource,
496
                            policy=newp, flags=flags)
497
                p = None
498

    
499
            # the order is intentionally reversed so that it
500
            # would break if we are not within a transaction.
501
            # Has helped before.
502
            h.save()
503
            newp.save()
504

    
505
            if p is not None and p.holding_set.count() == 0:
506
                p.delete()
507

    
508
        if rejected:
509
            raise ReturnButFail(rejected)
510
        return rejected
511

    
512
    def add_quota(self,
513
                  context={}, clientkey=None, serial=None,
514
                  sub_quota=(), add_quota=()):
515
        rejected = []
516
        append = rejected.append
517

    
518
        if serial is not None:
519
            if clientkey is None:
520
                all_pairs = [(q[0], q[1]) for q in sub_quota + add_quota]
521
                raise ReturnButFail(all_pairs)
522
            try:
523
                cs = CallSerial.objects.get(serial=serial, clientkey=clientkey)
524
                all_pairs = [(q[0], q[1]) for q in sub_quota + add_quota]
525
                raise ReturnButFail(all_pairs)
526
            except CallSerial.DoesNotExist:
527
                pass
528

    
529
        for removing, source in [(True, sub_quota), (False, add_quota)]:
530
            for (entity, resource, key,
531
                 quantity, capacity,
532
                 import_limit, export_limit) in source:
533

    
534
                try:
535
                    e = Entity.objects.get(entity=entity, key=key)
536
                except Entity.DoesNotExist:
537
                    append((entity, resource))
538
                    continue
539

    
540
                try:
541
                    h = db_get_holding(entity=entity, resource=resource,
542
                                       for_update=True)
543
                    p = h.policy
544
                except Holding.DoesNotExist:
545
                    if removing:
546
                        append((entity, resource))
547
                        continue
548
                    h = Holding(entity=e, resource=resource, flags=0)
549
                    p = None
550

    
551
                policy = newname('policy_')
552
                newp = Policy(policy=policy)
553

    
554
                newp.quantity = _add(p.quantity if p else 0, quantity,
555
                                     invert=removing)
556
                newp.capacity = _add(p.capacity if p else 0, capacity,
557
                                     invert=removing)
558
                newp.import_limit = _add(p.import_limit if p else 0,
559
                                         import_limit, invert=removing)
560
                newp.export_limit = _add(p.export_limit if p else 0,
561
                                         export_limit, invert=removing)
562

    
563
                new_values = [newp.capacity,
564
                              newp.import_limit, newp.export_limit]
565
                if any(map(_isneg, new_values)):
566
                    append((entity, resource))
567
                    continue
568

    
569
                h.policy = newp
570

    
571
                # the order is intentionally reversed so that it
572
                # would break if we are not within a transaction.
573
                # Has helped before.
574
                h.save()
575
                newp.save()
576

    
577
                if p is not None and p.holding_set.count() == 0:
578
                    p.delete()
579

    
580
        if rejected:
581
            raise ReturnButFail(rejected)
582

    
583
        if serial is not None and clientkey is not None:
584
            CallSerial.objects.create(serial=serial, clientkey=clientkey)
585
        return rejected
586

    
587
    def ack_serials(self, context={}, clientkey=None, serials=()):
588
        if clientkey is None:
589
            return
590

    
591
        for serial in serials:
592
            try:
593
                c = db_get_callserial(clientkey=clientkey,
594
                                      serial=serial,
595
                                      for_update=True)
596
                c.delete()
597
            except CallSerial.DoesNotExist:
598
                pass
599
        return
600

    
601
    def query_serials(self, context={}, clientkey=None, serials=()):
602
        result = []
603
        append = result.append
604

    
605
        if clientkey is None:
606
            return result
607

    
608
        if not serials:
609
            cs = CallSerial.objects.filter(clientkey=clientkey)
610
            return [c.serial for c in cs]
611

    
612
        for serial in serials:
613
            try:
614
                db_get_callserial(clientkey=clientkey, serial=serial)
615
                append(serial)
616
            except CallSerial.DoesNotExist:
617
                pass
618

    
619
        return result
620

    
621
    def issue_commission(self,
622
                         context={},
623
                         clientkey=None,
624
                         target=None,
625
                         key=None,
626
                         name=None,
627
                         provisions=()):
628

    
629
        try:
630
            t = Entity.objects.get(entity=target)
631
        except Entity.DoesNotExist:
632
            m = "No target entity '%s'" % (target,)
633
            raise NoEntityError(m)
634
        else:
635
            if t.key != key:
636
                m = "Invalid key for target entity '%s'" % (target,)
637
                raise InvalidKeyError(m)
638

    
639
        create = Commission.objects.create
640
        commission = create(entity_id=target, clientkey=clientkey, name=name)
641
        serial = commission.serial
642

    
643
        checked = []
644
        for entity, resource, quantity in provisions:
645

    
646
            if entity == target:
647
                m = "Cannot issue commission from an entity to itself (%s)" % (
648
                    entity,)
649
                raise InvalidDataError(m)
650

    
651
            ent_res = entity, resource
652
            if ent_res in checked:
653
                m = "Duplicate provision for %s.%s" % ent_res
654
                raise DuplicateError(m)
655
            checked.append(ent_res)
656

    
657
            try:
658
                e = Entity.objects.get(entity=entity)
659
            except Entity.DoesNotExist:
660
                m = "No source entity '%s'" % (entity,)
661
                raise NoEntityError(m)
662

    
663
            release = 0
664
            if quantity < 0:
665
                release = 1
666

    
667
            # Source limits checks
668
            try:
669
                h = db_get_holding(entity=entity, resource=resource,
670
                                   for_update=True)
671
            except Holding.DoesNotExist:
672
                m = ("There is no quantity "
673
                     "to allocate from in %s.%s" % (entity, resource))
674
                raise NoQuantityError(m,
675
                                      source=entity, target=target,
676
                                      resource=resource, requested=quantity,
677
                                      current=0, limit=0)
678

    
679
            hp = h.policy
680

    
681
            if not release:
682
                current = h.exporting
683
                limit = hp.export_limit
684
                if current + quantity > limit:
685
                    m = ("Export limit reached for %s.%s" % (entity, resource))
686
                    raise ExportLimitError(m,
687
                                           source=entity,
688
                                           target=target,
689
                                           resource=resource,
690
                                           requested=quantity,
691
                                           current=current,
692
                                           limit=limit)
693

    
694
                limit = hp.quantity + h.imported - h.releasing
695
                unavailable = h.exporting - h.returned
696
                available = limit - unavailable
697

    
698
                if quantity > available:
699
                    m = ("There is not enough quantity "
700
                         "to allocate from in %s.%s" % (entity, resource))
701
                    raise NoQuantityError(m,
702
                                          source=entity,
703
                                          target=target,
704
                                          resource=resource,
705
                                          requested=quantity,
706
                                          current=unavailable,
707
                                          limit=limit)
708
            else:
709
                current = (+ h.importing + h.returning
710
                           - h.exported - h.returned)
711
                limit = hp.capacity
712
                if current - quantity > limit:
713
                    m = ("There is not enough capacity "
714
                         "to release to in %s.%s" % (entity, resource))
715
                    raise NoQuantityError(m,
716
                                          source=entity,
717
                                          target=target,
718
                                          resource=resource,
719
                                          requested=quantity,
720
                                          current=current,
721
                                          limit=limit)
722

    
723
            # Target limits checks
724
            try:
725
                th = db_get_holding(entity=target, resource=resource,
726
                                    for_update=True)
727
            except Holding.DoesNotExist:
728
                m = ("There is no capacity "
729
                     "to allocate into in %s.%s" % (target, resource))
730
                raise NoCapacityError(m,
731
                                      source=entity,
732
                                      target=target,
733
                                      resource=resource,
734
                                      requested=quantity,
735
                                      current=0,
736
                                      limit=0)
737

    
738
            tp = th.policy
739

    
740
            if not release:
741
                limit = tp.import_limit
742
                current = th.importing
743
                if current + quantity > limit:
744
                    m = ("Import limit reached for %s.%s" % (target, resource))
745
                    raise ImportLimitError(m,
746
                                           source=entity,
747
                                           target=target,
748
                                           resource=resource,
749
                                           requested=quantity,
750
                                           current=current,
751
                                           limit=limit)
752

    
753
                limit = tp.quantity + tp.capacity
754
                current = (+ th.importing + th.returning + tp.quantity
755
                           - th.exported - th.released)
756

    
757
                if current + quantity > limit:
758
                    m = ("There is not enough capacity "
759
                         "to allocate into in %s.%s" % (target, resource))
760
                    raise NoCapacityError(m,
761
                                          source=entity,
762
                                          target=target,
763
                                          resource=resource,
764
                                          requested=quantity,
765
                                          current=current,
766
                                          limit=limit)
767
            else:
768
                limit = tp.quantity + th.imported - th.releasing
769
                unavailable = th.exporting - th.returned
770
                available = limit - unavailable
771

    
772
                if available + quantity < 0:
773
                    m = ("There is not enough quantity "
774
                         "to release from in %s.%s" % (target, resource))
775
                    raise NoCapacityError(m,
776
                                          source=entity,
777
                                          target=target,
778
                                          resource=resource,
779
                                          requested=quantity,
780
                                          current=unavailable,
781
                                          limit=limit)
782

    
783
            Provision.objects.create(serial=commission,
784
                                     entity=e,
785
                                     resource=resource,
786
                                     quantity=quantity)
787
            if release:
788
                h.returning -= quantity
789
                th.releasing -= quantity
790
            else:
791
                h.exporting += quantity
792
                th.importing += quantity
793

    
794
            h.save()
795
            th.save()
796

    
797
        return serial
798

    
799
    def _log_provision(self,
800
                       commission, s_holding, t_holding,
801
                       provision, log_time, reason):
802

    
803
        s_entity = s_holding.entity
804
        s_policy = s_holding.policy
805
        t_entity = t_holding.entity
806
        t_policy = t_holding.policy
807

    
808
        kwargs = {
809
            'serial':              commission.serial,
810
            'name':                commission.name,
811
            'source':              s_entity.entity,
812
            'target':              t_entity.entity,
813
            'resource':            provision.resource,
814
            'source_quantity':     s_policy.quantity,
815
            'source_capacity':     s_policy.capacity,
816
            'source_import_limit': s_policy.import_limit,
817
            'source_export_limit': s_policy.export_limit,
818
            'source_imported':     s_holding.imported,
819
            'source_exported':     s_holding.exported,
820
            'source_returned':     s_holding.returned,
821
            'source_released':     s_holding.released,
822
            'target_quantity':     t_policy.quantity,
823
            'target_capacity':     t_policy.capacity,
824
            'target_import_limit': t_policy.import_limit,
825
            'target_export_limit': t_policy.export_limit,
826
            'target_imported':     t_holding.imported,
827
            'target_exported':     t_holding.exported,
828
            'target_returned':     t_holding.returned,
829
            'target_released':     t_holding.released,
830
            'delta_quantity':      provision.quantity,
831
            'issue_time':          commission.issue_time,
832
            'log_time':            log_time,
833
            'reason':              reason,
834
        }
835

    
836
        ProvisionLog.objects.create(**kwargs)
837

    
838
    def accept_commission(self,
839
                          context={}, clientkey=None,
840
                          serials=(), reason=''):
841
        log_time = now()
842

    
843
        for serial in serials:
844
            try:
845
                c = db_get_commission(clientkey=clientkey, serial=serial,
846
                                      for_update=True)
847
            except Commission.DoesNotExist:
848
                return
849

    
850
            t = c.entity
851

    
852
            provisions = db_filter_provision(serial=serial, for_update=True)
853
            for pv in provisions:
854
                try:
855
                    h = db_get_holding(entity=pv.entity.entity,
856
                                       resource=pv.resource, for_update=True)
857
                    th = db_get_holding(entity=t, resource=pv.resource,
858
                                        for_update=True)
859
                except Holding.DoesNotExist:
860
                    m = "Corrupted provision"
861
                    raise CorruptedError(m)
862

    
863
                quantity = pv.quantity
864
                release = 0
865
                if quantity < 0:
866
                    release = 1
867

    
868
                if release:
869
                    h.returned -= quantity
870
                    th.released -= quantity
871
                else:
872
                    h.exported += quantity
873
                    th.imported += quantity
874

    
875
                reason = 'ACCEPT:' + reason[-121:]
876
                self._log_provision(c, h, th, pv, log_time, reason)
877
                h.save()
878
                th.save()
879
                pv.delete()
880
            c.delete()
881

    
882
        return
883

    
884
    def reject_commission(self,
885
                          context={}, clientkey=None,
886
                          serials=(), reason=''):
887
        log_time = now()
888

    
889
        for serial in serials:
890
            try:
891
                c = db_get_commission(clientkey=clientkey, serial=serial,
892
                                      for_update=True)
893
            except Commission.DoesNotExist:
894
                return
895

    
896
            t = c.entity
897

    
898
            provisions = db_filter_provision(serial=serial, for_update=True)
899
            for pv in provisions:
900
                try:
901
                    h = db_get_holding(entity=pv.entity.entity,
902
                                       resource=pv.resource, for_update=True)
903
                    th = db_get_holding(entity=t, resource=pv.resource,
904
                                        for_update=True)
905
                except Holding.DoesNotExist:
906
                    m = "Corrupted provision"
907
                    raise CorruptedError(m)
908

    
909
                quantity = pv.quantity
910
                release = 0
911
                if quantity < 0:
912
                    release = 1
913

    
914
                if release:
915
                    h.returning += quantity
916
                    th.releasing += quantity
917
                else:
918
                    h.exporting -= quantity
919
                    th.importing -= quantity
920

    
921
                reason = 'REJECT:' + reason[-121:]
922
                self._log_provision(c, h, th, pv, log_time, reason)
923
                h.save()
924
                th.save()
925
                pv.delete()
926
            c.delete()
927

    
928
        return
929

    
930
    def get_pending_commissions(self, context={}, clientkey=None):
931
        pending = Commission.objects.filter(clientkey=clientkey)
932
        pending_list = pending.values_list('serial', flat=True)
933
        return pending_list
934

    
935
    def resolve_pending_commissions(self,
936
                                    context={}, clientkey=None,
937
                                    max_serial=None, accept_set=()):
938
        accept_set = set(accept_set)
939
        pending = self.get_pending_commissions(context=context,
940
                                               clientkey=clientkey)
941
        pending = sorted(pending)
942

    
943
        accept = self.accept_commission
944
        reject = self.reject_commission
945

    
946
        for serial in pending:
947
            if serial > max_serial:
948
                break
949

    
950
            if serial in accept_set:
951
                accept(context=context, clientkey=clientkey, serials=[serial])
952
            else:
953
                reject(context=context, clientkey=clientkey, serials=[serial])
954

    
955
        return
956

    
957
    def release_entity(self, context={}, release_entity=()):
958
        rejected = []
959
        append = rejected.append
960
        for entity, key in release_entity:
961
            try:
962
                e = db_get_entity(entity=entity, key=key, for_update=True)
963
            except Entity.DoesNotExist:
964
                append(entity)
965
                continue
966

    
967
            if e.entities.count() != 0:
968
                append(entity)
969
                continue
970

    
971
            if e.holding_set.count() != 0:
972
                append(entity)
973
                continue
974

    
975
            e.delete()
976

    
977
        if rejected:
978
            raise ReturnButFail(rejected)
979
        return rejected
980

    
981
    def get_timeline(self, context={}, after="", before="Z", get_timeline=()):
982
        entity_set = set()
983
        e_add = entity_set.add
984
        resource_set = set()
985
        r_add = resource_set.add
986

    
987
        for entity, resource, key in get_timeline:
988
            if entity not in entity_set:
989
                try:
990
                    e = Entity.objects.get(entity=entity, key=key)
991
                    e_add(entity)
992
                except Entity.DoesNotExist:
993
                    continue
994

    
995
            r_add((entity, resource))
996

    
997
        chunk_size = 65536
998
        nr = 0
999
        timeline = []
1000
        append = timeline.append
1001
        filterlogs = ProvisionLog.objects.filter
1002
        if entity_set:
1003
            q_entity = Q(source__in=entity_set) | Q(target__in=entity_set)
1004
        else:
1005
            q_entity = Q()
1006

    
1007
        while 1:
1008
            logs = filterlogs(q_entity,
1009
                              issue_time__gt=after,
1010
                              issue_time__lte=before,
1011
                              reason__startswith='ACCEPT:')
1012

    
1013
            logs = logs.order_by('issue_time')
1014
            #logs = logs.values()
1015
            logs = logs[:chunk_size]
1016
            nr += len(logs)
1017
            if not logs:
1018
                break
1019
            for g in logs:
1020
                if ((g.source, g.resource) not in resource_set
1021
                    or (g.target, g.resource) not in resource_set):
1022
                    continue
1023

    
1024
                o = {
1025
                    'serial':                   g.serial,
1026
                    'source':                   g.source,
1027
                    'target':                   g.target,
1028
                    'resource':                 g.resource,
1029
                    'name':                     g.name,
1030
                    'quantity':                 g.delta_quantity,
1031
                    'source_allocated':         g.source_allocated(),
1032
                    'source_allocated_through': g.source_allocated_through(),
1033
                    'source_inbound':           g.source_inbound(),
1034
                    'source_inbound_through':   g.source_inbound_through(),
1035
                    'source_outbound':          g.source_outbound(),
1036
                    'source_outbound_through':  g.source_outbound_through(),
1037
                    'target_allocated':         g.target_allocated(),
1038
                    'target_allocated_through': g.target_allocated_through(),
1039
                    'target_inbound':           g.target_inbound(),
1040
                    'target_inbound_through':   g.target_inbound_through(),
1041
                    'target_outbound':          g.target_outbound(),
1042
                    'target_outbound_through':  g.target_outbound_through(),
1043
                    'issue_time':               g.issue_time,
1044
                    'log_time':                 g.log_time,
1045
                    'reason':                   g.reason,
1046
                }
1047

    
1048
                append(o)
1049

    
1050
            after = g.issue_time
1051
            if after >= before:
1052
                break
1053

    
1054
        return timeline
1055

    
1056

    
1057
def _add(x, y, invert=False):
1058
    return x + y if not invert else x - y
1059

    
1060

    
1061
def _update(dest, source, attr, delta):
1062
    dest_attr = getattr(dest, attr)
1063
    dest_attr = _add(getattr(source, attr, 0), delta)
1064

    
1065

    
1066
def _isneg(x):
1067
    return x < 0
1068

    
1069

    
1070
API_Callpoint = QuotaholderDjangoDBCallpoint