Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / quotaholder / callpoint.py @ ccdb7c02

History | View | Annotate | Download (38.9 kB)

1
# Copyright 2012, 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from synnefo.lib.quotaholder.api import (
35
    QuotaholderAPI,
36
    QH_PRACTICALLY_INFINITE,
37
    InvalidKeyError, NoEntityError,
38
    NoQuantityError, NoCapacityError,
39
    ExportLimitError, ImportLimitError,
40
    DuplicateError)
41

    
42
from synnefo.lib.commissioning import (
43
    Callpoint, CorruptedError, InvalidDataError, ReturnButFail)
44
from synnefo.lib.commissioning.utils.newname import newname
45

    
46
from django.db.models import Q, Count
47
from django.db import transaction
48
from .models import (Entity, Policy, Holding,
49
                     Commission, Provision, ProvisionLog, CallSerial,
50
                     now,
51
                     db_get_entity, db_get_holding, db_get_policy,
52
                     db_get_commission, db_filter_provision, db_get_callserial)
53

    
54

    
55
class QuotaholderDjangoDBCallpoint(Callpoint):
56

    
57
    api_spec = QuotaholderAPI()
58

    
59
    http_exc_lookup = {
60
        CorruptedError:   550,
61
        InvalidDataError: 400,
62
        InvalidKeyError:  401,
63
        NoEntityError:    404,
64
        NoQuantityError:  413,
65
        NoCapacityError:  413,
66
    }
67

    
68
    def init_connection(self, connection):
69
        if connection is not None:
70
            raise ValueError("Cannot specify connection args with %s" %
71
                             type(self).__name__)
72

    
73
    def commit(self):
74
        transaction.commit()
75

    
76
    def rollback(self):
77
        transaction.rollback()
78

    
79
    def do_make_call(self, call_name, data):
80
        call_fn = getattr(self, call_name, None)
81
        if not call_fn:
82
            m = "cannot find call '%s'" % (call_name,)
83
            raise CorruptedError(m)
84

    
85
        return call_fn(**data)
86

    
87
    def create_entity(self, context=None, create_entity=()):
88
        rejected = []
89
        append = rejected.append
90

    
91
        for idx, (entity, owner, key, ownerkey) in enumerate(create_entity):
92
            try:
93
                owner = Entity.objects.get(entity=owner, key=ownerkey)
94
            except Entity.DoesNotExist:
95
                append(idx)
96
                continue
97

    
98
            try:
99
                e = Entity.objects.get(entity=entity)
100
                append(idx)
101
            except Entity.DoesNotExist:
102
                e = Entity.objects.create(entity=entity,
103
                                          owner=owner,
104
                                          key=key)
105

    
106
        if rejected:
107
            raise ReturnButFail(rejected)
108
        return rejected
109

    
110
    def set_entity_key(self, context=None, set_entity_key=()):
111
        rejected = []
112
        append = rejected.append
113

    
114
        for entity, key, newkey in set_entity_key:
115
            try:
116
                e = db_get_entity(entity=entity, key=key, for_update=True)
117
            except Entity.DoesNotExist:
118
                append(entity)
119
                continue
120

    
121
            e.key = newkey
122
            e.save()
123

    
124
        if rejected:
125
            raise ReturnButFail(rejected)
126
        return rejected
127

    
128
    def list_entities(self, context=None, entity=None, key=None):
129
        try:
130
            e = Entity.objects.get(entity=entity, key=key)
131
        except Entity.DoesNotExist:
132
            m = "Entity '%s' does not exist" % (entity,)
133
            raise NoEntityError(m)
134

    
135
        children = e.entities.all()
136
        entities = [e.entity for e in children]
137
        return entities
138

    
139
    def get_entity(self, context=None, get_entity=()):
140
        entities = []
141
        append = entities.append
142

    
143
        names = [entity for entity, key in get_entity]
144
        es = Entity.objects.select_related(depth=1).filter(entity__in=names)
145
        data = {}
146
        for e in es:
147
            data[e.entity] = e
148

    
149
        for entity, key in get_entity:
150
            e = data.get(entity, None)
151
            if e is None or e.key != key:
152
                continue
153
            append((entity, e.owner.entity))
154

    
155
        return entities
156

    
157
    def get_limits(self, context=None, get_limits=()):
158
        limits = []
159
        append = limits.append
160

    
161
        for policy in get_limits:
162
            try:
163
                p = Policy.objects.get(policy=policy)
164
            except Policy.DoesNotExist:
165
                continue
166

    
167
            append((policy, p.quantity, p.capacity,
168
                    p.import_limit, p.export_limit))
169

    
170
        return limits
171

    
172
    def set_limits(self, context=None, set_limits=()):
173

    
174
        for (policy, quantity, capacity,
175
             import_limit, export_limit) in set_limits:
176

    
177
            try:
178
                policy = db_get_policy(policy=policy, for_update=True)
179
            except Policy.DoesNotExist:
180
                Policy.objects.create(policy=policy,
181
                                      quantity=quantity,
182
                                      capacity=capacity,
183
                                      import_limit=import_limit,
184
                                      export_limit=export_limit)
185
            else:
186
                policy.quantity = quantity
187
                policy.capacity = capacity
188
                policy.export_limit = export_limit
189
                policy.import_limit = import_limit
190
                policy.save()
191

    
192
        return ()
193

    
194
    def get_holding(self, context=None, get_holding=()):
195
        holdings = []
196
        append = holdings.append
197

    
198
        for entity, resource, key in get_holding:
199
            try:
200
                h = Holding.objects.get(entity=entity, resource=resource)
201
            except Holding.DoesNotExist:
202
                continue
203

    
204
            if h.entity.key != key:
205
                continue
206

    
207
            append((h.entity.entity, h.resource, h.policy.policy,
208
                    h.imported, h.exported,
209
                    h.returned, h.released, h.flags))
210

    
211
        return holdings
212

    
213
    def _set_holding(self, entity, resource, policy, flags):
214
        try:
215
            h = db_get_holding(entity=entity, resource=resource,
216
                               for_update=True)
217
            h.policy = p
218
            h.flags = flags
219
            h.save()
220
        except Holding.DoesNotExist:
221
            h = Holding.objects.create(entity=e, resource=resource,
222
                                       policy=p, flags=flags)
223
        return h
224

    
225
    def set_holding(self, context=None, set_holding=()):
226
        rejected = []
227
        append = rejected.append
228

    
229
        for entity, resource, key, policy, flags in set_holding:
230
            try:
231
                e = Entity.objects.get(entity=entity, key=key)
232
            except Entity.DoesNotExist:
233
                append((entity, resource, policy))
234
                continue
235

    
236
            if e.key != key:
237
                append((entity, resource, policy))
238
                continue
239

    
240
            try:
241
                p = Policy.objects.get(policy=policy)
242
            except Policy.DoesNotExist:
243
                append((entity, resource, policy))
244
                continue
245

    
246
            try:
247
                h = db_get_holding(entity=entity, resource=resource,
248
                                   for_update=True)
249
                h.policy = p
250
                h.flags = flags
251
                h.save()
252
            except Holding.DoesNotExist:
253
                h = Holding.objects.create(entity=e, resource=resource,
254
                                           policy=p, flags=flags)
255

    
256
        if rejected:
257
            raise ReturnButFail(rejected)
258
        return rejected
259

    
260
    def _init_holding(self,
261
                      entity, resource, policy,
262
                      imported, exported, returned, released,
263
                      flags):
264
        try:
265
            h = db_get_holding(entity=entity, resource=resource,
266
                               for_update=True)
267
        except Holding.DoesNotExist:
268
            h = Holding(entity=entity, resource=resource)
269

    
270
        h.policy = policy
271
        h.flags = flags
272
        h.imported = imported
273
        h.importing = imported
274
        h.exported = exported
275
        h.exporting = exported
276
        h.returned = returned
277
        h.returning = returned
278
        h.released = released
279
        h.releasing = released
280
        h.save()
281

    
282
    def init_holding(self, context=None, init_holding=()):
283
        rejected = []
284
        append = rejected.append
285

    
286
        for idx, sfh in enumerate(init_holding):
287
            (entity, resource, key, policy,
288
             imported, exported, returned, released,
289
             flags) = sfh
290
            try:
291
                e = Entity.objects.get(entity=entity, key=key)
292
            except Entity.DoesNotExist:
293
                append(idx)
294
                continue
295

    
296
            if e.key != key:
297
                append(idx)
298
                continue
299

    
300
            try:
301
                p = Policy.objects.get(policy=policy)
302
            except Policy.DoesNotExist:
303
                append(idx)
304
                continue
305

    
306
            self._init_holding(e, resource, p,
307
                               imported, exported,
308
                               returned, released,
309
                               flags)
310
        if rejected:
311
            raise ReturnButFail(rejected)
312
        return rejected
313

    
314
    def reset_holding(self, context=None, reset_holding=()):
315
        rejected = []
316
        append = rejected.append
317

    
318
        for idx, tpl in enumerate(reset_holding):
319
            (entity, resource, key,
320
             imported, exported, returned, released) = tpl
321
            try:
322
                e = Entity.objects.get(entity=entity, key=key)
323
            except Entity.DoesNotExist:
324
                append(idx)
325
                continue
326

    
327
            try:
328
                h = db_get_holding(entity=entity, resource=resource,
329
                                   for_update=True)
330
                h.imported = imported
331
                h.importing = imported
332
                h.exported = exported
333
                h.exporting = exported
334
                h.returned = returned
335
                h.returning = returned
336
                h.released = released
337
                h.releasing = released
338
                h.save()
339
            except Holding.DoesNotExist:
340
                append(idx)
341
                continue
342

    
343
        if rejected:
344
            raise ReturnButFail(rejected)
345
        return rejected
346

    
347
    def _check_pending(self, entity, resource):
348
        cs = Commission.objects.filter(entity=entity)
349
        cs = [c for c in cs if c.provisions.filter(resource=resource)]
350
        as_target = [c.serial for c in cs]
351

    
352
        ps = Provision.objects.filter(entity=entity, resource=resource)
353
        as_source = [p.serial.serial for p in ps]
354

    
355
        return as_target + as_source
356

    
357
    def _actual_quantity(self, holding):
358
        hp = holding.policy
359
        return hp.quantity + (holding.imported + holding.returned -
360
                              holding.exported - holding.released)
361

    
362
    def _new_policy_name(self):
363
        return newname('policy_')
364

    
365
    def _increase_resource(self, entity, resource, amount):
366
        try:
367
            h = db_get_holding(entity=entity, resource=resource,
368
                               for_update=True)
369
        except Holding.DoesNotExist:
370
            h = Holding(entity=entity, resource=resource)
371
            p = Policy.objects.create(policy=self._new_policy_name(),
372
                                      quantity=0,
373
                                      capacity=QH_PRACTICALLY_INFINITE,
374
                                      import_limit=QH_PRACTICALLY_INFINITE,
375
                                      export_limit=QH_PRACTICALLY_INFINITE)
376
            h.policy = p
377
        h.imported += amount
378
        h.save()
379

    
380
    def release_holding(self, context=None, release_holding=()):
381
        rejected = []
382
        append = rejected.append
383

    
384
        for idx, (entity, resource, key) in enumerate(release_holding):
385
            try:
386
                h = db_get_holding(entity=entity, resource=resource,
387
                                   for_update=True)
388
            except Holding.DoesNotExist:
389
                append(idx)
390
                continue
391

    
392
            if h.entity.key != key:
393
                append(idx)
394
                continue
395

    
396
            if self._check_pending(entity, resource):
397
                append(idx)
398
                continue
399

    
400
            q = self._actual_quantity(h)
401
            if q > 0:
402
                owner = h.entity.owner
403
                self._increase_resource(owner, resource, q)
404

    
405
            h.delete()
406

    
407
        if rejected:
408
            raise ReturnButFail(rejected)
409
        return rejected
410

    
411
    def list_resources(self, context=None, entity=None, key=None):
412
        try:
413
            e = Entity.objects.get(entity=entity)
414
        except Entity.DoesNotExist:
415
            m = "No such entity '%s'" % (entity,)
416
            raise NoEntityError(m)
417

    
418
        if e.key != key:
419
            m = "Invalid key for entity '%s'" % (entity,)
420
            raise InvalidKeyError(m)
421

    
422
        holdings = e.holding_set.filter(entity=entity)
423
        resources = [h.resource for h in holdings]
424
        return resources
425

    
426
    def list_holdings(self, context=None, list_holdings=()):
427
        rejected = []
428
        reject = rejected.append
429
        holdings_list = []
430
        append = holdings_list.append
431

    
432
        for entity, key in list_holdings:
433
            try:
434
                e = Entity.objects.get(entity=entity)
435
                if e.key != key:
436
                    raise Entity.DoesNotExist("wrong key")
437
            except Entity.DoesNotExist:
438
                reject(entity)
439
                continue
440

    
441
            holdings = e.holding_set.filter(entity=entity)
442
            append([[entity, h.resource,
443
                     h.imported, h.exported, h.returned, h.released]
444
                    for h in holdings])
445

    
446
        return holdings_list, rejected
447

    
448
    def get_quota(self, context=None, get_quota=()):
449
        quotas = []
450
        append = quotas.append
451

    
452
        entities = set(e for e, r, k in get_quota)
453
        hs = Holding.objects.select_related().filter(entity__in=entities)
454
        holdings = {}
455
        for h in hs:
456
            holdings[(h.entity_id, h.resource)] = h
457

    
458
        for entity, resource, key in get_quota:
459
            try:
460
                h = holdings[(entity, resource)]
461
            except:
462
                continue
463

    
464
            if h.entity.key != key:
465
                continue
466

    
467
            p = h.policy
468

    
469
            append((h.entity.entity, h.resource, p.quantity, p.capacity,
470
                    p.import_limit, p.export_limit,
471
                    h.imported, h.exported,
472
                    h.returned, h.released,
473
                    h.flags))
474

    
475
        return quotas
476

    
477
    def set_quota(self, context=None, set_quota=()):
478
        rejected = []
479
        append = rejected.append
480

    
481
        q_holdings = Q()
482
        entities = []
483
        for (entity, resource, key, _, _, _, _, _) in set_quota:
484
            entities.append(entity)
485

    
486
        hs = Holding.objects.filter(entity__in=entities).select_for_update()
487
        holdings = {}
488
        for h in hs:
489
            holdings[(h.entity_id, h.resource)] = h
490

    
491
        entities = Entity.objects.in_bulk(entities)
492

    
493
        old_policies = []
494

    
495
        for (entity, resource, key,
496
             quantity, capacity,
497
             import_limit, export_limit, flags) in set_quota:
498

    
499
            e = entities.get(entity, None)
500
            if e is None or e.key != key:
501
                append((entity, resource))
502
                continue
503

    
504
            policy = newname('policy_')
505
            newp = Policy(policy=policy,
506
                          quantity=quantity,
507
                          capacity=capacity,
508
                          import_limit=import_limit,
509
                          export_limit=export_limit)
510

    
511
            try:
512
                h = holdings[(entity, resource)]
513
                old_policies.append(h.policy_id)
514
                h.policy = newp
515
                h.flags = flags
516
            except KeyError:
517
                h = Holding(entity=e, resource=resource,
518
                            policy=newp, flags=flags)
519

    
520
            # the order is intentionally reversed so that it
521
            # would break if we are not within a transaction.
522
            # Has helped before.
523
            h.save()
524
            newp.save()
525
            holdings[(entity, resource)] = h
526

    
527
        objs = Policy.objects.annotate(refs=Count('holding'))
528
        objs.filter(policy__in=old_policies, refs=0).delete()
529

    
530
        if rejected:
531
            raise ReturnButFail(rejected)
532
        return rejected
533

    
534
    def add_quota(self,
535
                  context=None, clientkey=None, serial=None,
536
                  sub_quota=(), add_quota=()):
537
        rejected = []
538
        append = rejected.append
539

    
540
        if serial is not None:
541
            if clientkey is None:
542
                all_pairs = [(q[0], q[1]) for q in sub_quota + add_quota]
543
                raise ReturnButFail(all_pairs)
544
            try:
545
                cs = CallSerial.objects.get(serial=serial, clientkey=clientkey)
546
                all_pairs = [(q[0], q[1]) for q in sub_quota + add_quota]
547
                raise ReturnButFail(all_pairs)
548
            except CallSerial.DoesNotExist:
549
                pass
550

    
551
        sources = sub_quota + add_quota
552
        q_holdings = Q()
553
        entities = []
554
        for (entity, resource, key, _, _, _, _) in sources:
555
            entities.append(entity)
556

    
557
        hs = Holding.objects.filter(entity__in=entities).select_for_update()
558
        holdings = {}
559
        for h in hs:
560
            holdings[(h.entity_id, h.resource)] = h
561

    
562
        entities = Entity.objects.in_bulk(entities)
563

    
564
        pids = [h.policy_id for h in hs]
565
        policies = Policy.objects.in_bulk(pids)
566

    
567
        old_policies = []
568

    
569
        for removing, source in [(True, sub_quota), (False, add_quota)]:
570
            for (entity, resource, key,
571
                 quantity, capacity,
572
                 import_limit, export_limit) in source:
573

    
574
                e = entities.get(entity, None)
575
                if e is None or e.key != key:
576
                    append((entity, resource))
577
                    continue
578

    
579
                try:
580
                    h = holdings[(entity, resource)]
581
                    old_policies.append(h.policy_id)
582
                    try:
583
                        p = policies[h.policy_id]
584
                    except KeyError:
585
                        raise AssertionError("no policy %s" % h.policy_id)
586
                except KeyError:
587
                    if removing:
588
                        append((entity, resource))
589
                        continue
590
                    h = Holding(entity=e, resource=resource, flags=0)
591
                    p = None
592

    
593
                policy = newname('policy_')
594
                newp = Policy(policy=policy)
595

    
596
                newp.quantity = _add(p.quantity if p else 0, quantity,
597
                                     invert=removing)
598
                newp.capacity = _add(p.capacity if p else 0, capacity,
599
                                     invert=removing)
600
                newp.import_limit = _add(p.import_limit if p else 0,
601
                                         import_limit, invert=removing)
602
                newp.export_limit = _add(p.export_limit if p else 0,
603
                                         export_limit, invert=removing)
604

    
605
                new_values = [newp.capacity,
606
                              newp.import_limit, newp.export_limit]
607
                if any(map(_isneg, new_values)):
608
                    append((entity, resource))
609
                    continue
610

    
611
                h.policy = newp
612

    
613
                # the order is intentionally reversed so that it
614
                # would break if we are not within a transaction.
615
                # Has helped before.
616
                h.save()
617
                newp.save()
618
                policies[policy] = newp
619
                holdings[(entity, resource)] = h
620

    
621
        objs = Policy.objects.annotate(refs=Count('holding'))
622
        objs.filter(policy__in=old_policies, refs=0).delete()
623

    
624
        if rejected:
625
            raise ReturnButFail(rejected)
626

    
627
        if serial is not None and clientkey is not None:
628
            CallSerial.objects.create(serial=serial, clientkey=clientkey)
629
        return rejected
630

    
631
    def ack_serials(self, context=None, clientkey=None, serials=()):
632
        if clientkey is None:
633
            return
634

    
635
        for serial in serials:
636
            try:
637
                c = db_get_callserial(clientkey=clientkey,
638
                                      serial=serial,
639
                                      for_update=True)
640
                c.delete()
641
            except CallSerial.DoesNotExist:
642
                pass
643
        return
644

    
645
    def query_serials(self, context=None, clientkey=None, serials=()):
646
        result = []
647
        append = result.append
648

    
649
        if clientkey is None:
650
            return result
651

    
652
        if not serials:
653
            cs = CallSerial.objects.filter(clientkey=clientkey)
654
            return [c.serial for c in cs]
655

    
656
        for serial in serials:
657
            try:
658
                db_get_callserial(clientkey=clientkey, serial=serial)
659
                append(serial)
660
            except CallSerial.DoesNotExist:
661
                pass
662

    
663
        return result
664

    
665
    def issue_commission(self,
666
                         context=None,
667
                         clientkey=None,
668
                         target=None,
669
                         key=None,
670
                         name=None,
671
                         provisions=()):
672

    
673
        try:
674
            t = Entity.objects.get(entity=target)
675
        except Entity.DoesNotExist:
676
            m = "No target entity '%s'" % (target,)
677
            raise NoEntityError(m)
678
        else:
679
            if t.key != key:
680
                m = "Invalid key for target entity '%s'" % (target,)
681
                raise InvalidKeyError(m)
682

    
683
        create = Commission.objects.create
684
        commission = create(entity_id=target, clientkey=clientkey, name=name)
685
        serial = commission.serial
686

    
687
        checked = []
688
        for entity, resource, quantity in provisions:
689

    
690
            if entity == target:
691
                m = "Cannot issue commission from an entity to itself (%s)" % (
692
                    entity,)
693
                raise InvalidDataError(m)
694

    
695
            ent_res = entity, resource
696
            if ent_res in checked:
697
                m = "Duplicate provision for %s.%s" % ent_res
698
                raise DuplicateError(m)
699
            checked.append(ent_res)
700

    
701
            try:
702
                e = Entity.objects.get(entity=entity)
703
            except Entity.DoesNotExist:
704
                m = "No source entity '%s'" % (entity,)
705
                raise NoEntityError(m)
706

    
707
            release = 0
708
            if quantity < 0:
709
                release = 1
710

    
711
            # Source limits checks
712
            try:
713
                h = db_get_holding(entity=entity, resource=resource,
714
                                   for_update=True)
715
            except Holding.DoesNotExist:
716
                m = ("There is no quantity "
717
                     "to allocate from in %s.%s" % (entity, resource))
718
                raise NoQuantityError(m,
719
                                      source=entity, target=target,
720
                                      resource=resource, requested=quantity,
721
                                      current=0, limit=0)
722

    
723
            hp = h.policy
724

    
725
            if not release:
726
                current = h.exporting
727
                limit = hp.export_limit
728
                if current + quantity > limit:
729
                    m = ("Export limit reached for %s.%s" % (entity, resource))
730
                    raise ExportLimitError(m,
731
                                           source=entity,
732
                                           target=target,
733
                                           resource=resource,
734
                                           requested=quantity,
735
                                           current=current,
736
                                           limit=limit)
737

    
738
                limit = hp.quantity + h.imported - h.releasing
739
                unavailable = h.exporting - h.returned
740
                available = limit - unavailable
741

    
742
                if quantity > available:
743
                    m = ("There is not enough quantity "
744
                         "to allocate from in %s.%s" % (entity, resource))
745
                    raise NoQuantityError(m,
746
                                          source=entity,
747
                                          target=target,
748
                                          resource=resource,
749
                                          requested=quantity,
750
                                          current=unavailable,
751
                                          limit=limit)
752
            else:
753
                current = (+ h.importing + h.returning
754
                           - h.exported - h.returned)
755
                limit = hp.capacity
756
                if current - quantity > limit:
757
                    m = ("There is not enough capacity "
758
                         "to release to in %s.%s" % (entity, resource))
759
                    raise NoQuantityError(m,
760
                                          source=entity,
761
                                          target=target,
762
                                          resource=resource,
763
                                          requested=quantity,
764
                                          current=current,
765
                                          limit=limit)
766

    
767
            # Target limits checks
768
            try:
769
                th = db_get_holding(entity=target, resource=resource,
770
                                    for_update=True)
771
            except Holding.DoesNotExist:
772
                m = ("There is no capacity "
773
                     "to allocate into in %s.%s" % (target, resource))
774
                raise NoCapacityError(m,
775
                                      source=entity,
776
                                      target=target,
777
                                      resource=resource,
778
                                      requested=quantity,
779
                                      current=0,
780
                                      limit=0)
781

    
782
            tp = th.policy
783

    
784
            if not release:
785
                limit = tp.import_limit
786
                current = th.importing
787
                if current + quantity > limit:
788
                    m = ("Import limit reached for %s.%s" % (target, resource))
789
                    raise ImportLimitError(m,
790
                                           source=entity,
791
                                           target=target,
792
                                           resource=resource,
793
                                           requested=quantity,
794
                                           current=current,
795
                                           limit=limit)
796

    
797
                limit = tp.quantity + tp.capacity
798
                current = (+ th.importing + th.returning + tp.quantity
799
                           - th.exported - th.released)
800

    
801
                if current + quantity > limit:
802
                    m = ("There is not enough capacity "
803
                         "to allocate into in %s.%s" % (target, resource))
804
                    raise NoCapacityError(m,
805
                                          source=entity,
806
                                          target=target,
807
                                          resource=resource,
808
                                          requested=quantity,
809
                                          current=current,
810
                                          limit=limit)
811
            else:
812
                limit = tp.quantity + th.imported - th.releasing
813
                unavailable = th.exporting - th.returned
814
                available = limit - unavailable
815

    
816
                if available + quantity < 0:
817
                    m = ("There is not enough quantity "
818
                         "to release from in %s.%s" % (target, resource))
819
                    raise NoCapacityError(m,
820
                                          source=entity,
821
                                          target=target,
822
                                          resource=resource,
823
                                          requested=quantity,
824
                                          current=unavailable,
825
                                          limit=limit)
826

    
827
            Provision.objects.create(serial=commission,
828
                                     entity=e,
829
                                     resource=resource,
830
                                     quantity=quantity)
831
            if release:
832
                h.returning -= quantity
833
                th.releasing -= quantity
834
            else:
835
                h.exporting += quantity
836
                th.importing += quantity
837

    
838
            h.save()
839
            th.save()
840

    
841
        return serial
842

    
843
    def _log_provision(self,
844
                       commission, s_holding, t_holding,
845
                       provision, log_time, reason):
846

    
847
        s_entity = s_holding.entity
848
        s_policy = s_holding.policy
849
        t_entity = t_holding.entity
850
        t_policy = t_holding.policy
851

    
852
        kwargs = {
853
            'serial':              commission.serial,
854
            'name':                commission.name,
855
            'source':              s_entity.entity,
856
            'target':              t_entity.entity,
857
            'resource':            provision.resource,
858
            'source_quantity':     s_policy.quantity,
859
            'source_capacity':     s_policy.capacity,
860
            'source_import_limit': s_policy.import_limit,
861
            'source_export_limit': s_policy.export_limit,
862
            'source_imported':     s_holding.imported,
863
            'source_exported':     s_holding.exported,
864
            'source_returned':     s_holding.returned,
865
            'source_released':     s_holding.released,
866
            'target_quantity':     t_policy.quantity,
867
            'target_capacity':     t_policy.capacity,
868
            'target_import_limit': t_policy.import_limit,
869
            'target_export_limit': t_policy.export_limit,
870
            'target_imported':     t_holding.imported,
871
            'target_exported':     t_holding.exported,
872
            'target_returned':     t_holding.returned,
873
            'target_released':     t_holding.released,
874
            'delta_quantity':      provision.quantity,
875
            'issue_time':          commission.issue_time,
876
            'log_time':            log_time,
877
            'reason':              reason,
878
        }
879

    
880
        ProvisionLog.objects.create(**kwargs)
881

    
882
    def accept_commission(self,
883
                          context=None, clientkey=None,
884
                          serials=(), reason=''):
885
        log_time = now()
886

    
887
        for serial in serials:
888
            try:
889
                c = db_get_commission(clientkey=clientkey, serial=serial,
890
                                      for_update=True)
891
            except Commission.DoesNotExist:
892
                return
893

    
894
            t = c.entity
895

    
896
            provisions = db_filter_provision(serial=serial, for_update=True)
897
            for pv in provisions:
898
                try:
899
                    h = db_get_holding(entity=pv.entity.entity,
900
                                       resource=pv.resource, for_update=True)
901
                    th = db_get_holding(entity=t, resource=pv.resource,
902
                                        for_update=True)
903
                except Holding.DoesNotExist:
904
                    m = "Corrupted provision"
905
                    raise CorruptedError(m)
906

    
907
                quantity = pv.quantity
908
                release = 0
909
                if quantity < 0:
910
                    release = 1
911

    
912
                if release:
913
                    h.returned -= quantity
914
                    th.released -= quantity
915
                else:
916
                    h.exported += quantity
917
                    th.imported += quantity
918

    
919
                reason = 'ACCEPT:' + reason[-121:]
920
                self._log_provision(c, h, th, pv, log_time, reason)
921
                h.save()
922
                th.save()
923
                pv.delete()
924
            c.delete()
925

    
926
        return
927

    
928
    def reject_commission(self,
929
                          context=None, clientkey=None,
930
                          serials=(), reason=''):
931
        log_time = now()
932

    
933
        for serial in serials:
934
            try:
935
                c = db_get_commission(clientkey=clientkey, serial=serial,
936
                                      for_update=True)
937
            except Commission.DoesNotExist:
938
                return
939

    
940
            t = c.entity
941

    
942
            provisions = db_filter_provision(serial=serial, for_update=True)
943
            for pv in provisions:
944
                try:
945
                    h = db_get_holding(entity=pv.entity.entity,
946
                                       resource=pv.resource, for_update=True)
947
                    th = db_get_holding(entity=t, resource=pv.resource,
948
                                        for_update=True)
949
                except Holding.DoesNotExist:
950
                    m = "Corrupted provision"
951
                    raise CorruptedError(m)
952

    
953
                quantity = pv.quantity
954
                release = 0
955
                if quantity < 0:
956
                    release = 1
957

    
958
                if release:
959
                    h.returning += quantity
960
                    th.releasing += quantity
961
                else:
962
                    h.exporting -= quantity
963
                    th.importing -= quantity
964

    
965
                reason = 'REJECT:' + reason[-121:]
966
                self._log_provision(c, h, th, pv, log_time, reason)
967
                h.save()
968
                th.save()
969
                pv.delete()
970
            c.delete()
971

    
972
        return
973

    
974
    def get_pending_commissions(self, context=None, clientkey=None):
975
        pending = Commission.objects.filter(clientkey=clientkey)
976
        pending_list = pending.values_list('serial', flat=True)
977
        return pending_list
978

    
979
    def resolve_pending_commissions(self,
980
                                    context=None, clientkey=None,
981
                                    max_serial=None, accept_set=()):
982
        accept_set = set(accept_set)
983
        pending = self.get_pending_commissions(context=context,
984
                                               clientkey=clientkey)
985
        pending = sorted(pending)
986

    
987
        accept = self.accept_commission
988
        reject = self.reject_commission
989

    
990
        for serial in pending:
991
            if serial > max_serial:
992
                break
993

    
994
            if serial in accept_set:
995
                accept(context=context, clientkey=clientkey, serials=[serial])
996
            else:
997
                reject(context=context, clientkey=clientkey, serials=[serial])
998

    
999
        return
1000

    
1001
    def release_entity(self, context=None, release_entity=()):
1002
        rejected = []
1003
        append = rejected.append
1004
        for entity, key in release_entity:
1005
            try:
1006
                e = db_get_entity(entity=entity, key=key, for_update=True)
1007
            except Entity.DoesNotExist:
1008
                append(entity)
1009
                continue
1010

    
1011
            if e.entities.count() != 0:
1012
                append(entity)
1013
                continue
1014

    
1015
            if e.holding_set.count() != 0:
1016
                append(entity)
1017
                continue
1018

    
1019
            e.delete()
1020

    
1021
        if rejected:
1022
            raise ReturnButFail(rejected)
1023
        return rejected
1024

    
1025
    def get_timeline(self, context=None, after="", before="Z", get_timeline=()):
1026
        entity_set = set()
1027
        e_add = entity_set.add
1028
        resource_set = set()
1029
        r_add = resource_set.add
1030

    
1031
        for entity, resource, key in get_timeline:
1032
            if entity not in entity_set:
1033
                try:
1034
                    e = Entity.objects.get(entity=entity, key=key)
1035
                    e_add(entity)
1036
                except Entity.DoesNotExist:
1037
                    continue
1038

    
1039
            r_add((entity, resource))
1040

    
1041
        chunk_size = 65536
1042
        nr = 0
1043
        timeline = []
1044
        append = timeline.append
1045
        filterlogs = ProvisionLog.objects.filter
1046
        if entity_set:
1047
            q_entity = Q(source__in=entity_set) | Q(target__in=entity_set)
1048
        else:
1049
            q_entity = Q()
1050

    
1051
        while 1:
1052
            logs = filterlogs(q_entity,
1053
                              issue_time__gt=after,
1054
                              issue_time__lte=before,
1055
                              reason__startswith='ACCEPT:')
1056

    
1057
            logs = logs.order_by('issue_time')
1058
            #logs = logs.values()
1059
            logs = logs[:chunk_size]
1060
            nr += len(logs)
1061
            if not logs:
1062
                break
1063
            for g in logs:
1064
                if ((g.source, g.resource) not in resource_set
1065
                    or (g.target, g.resource) not in resource_set):
1066
                    continue
1067

    
1068
                o = {
1069
                    'serial':                   g.serial,
1070
                    'source':                   g.source,
1071
                    'target':                   g.target,
1072
                    'resource':                 g.resource,
1073
                    'name':                     g.name,
1074
                    'quantity':                 g.delta_quantity,
1075
                    'source_allocated':         g.source_allocated(),
1076
                    'source_allocated_through': g.source_allocated_through(),
1077
                    'source_inbound':           g.source_inbound(),
1078
                    'source_inbound_through':   g.source_inbound_through(),
1079
                    'source_outbound':          g.source_outbound(),
1080
                    'source_outbound_through':  g.source_outbound_through(),
1081
                    'target_allocated':         g.target_allocated(),
1082
                    'target_allocated_through': g.target_allocated_through(),
1083
                    'target_inbound':           g.target_inbound(),
1084
                    'target_inbound_through':   g.target_inbound_through(),
1085
                    'target_outbound':          g.target_outbound(),
1086
                    'target_outbound_through':  g.target_outbound_through(),
1087
                    'issue_time':               g.issue_time,
1088
                    'log_time':                 g.log_time,
1089
                    'reason':                   g.reason,
1090
                }
1091

    
1092
                append(o)
1093

    
1094
            after = g.issue_time
1095
            if after >= before:
1096
                break
1097

    
1098
        return timeline
1099

    
1100

    
1101
def _add(x, y, invert=False):
1102
    return x + y if not invert else x - y
1103

    
1104

    
1105
def _update(dest, source, attr, delta):
1106
    dest_attr = getattr(dest, attr)
1107
    dest_attr = _add(getattr(source, attr, 0), delta)
1108

    
1109

    
1110
def _isneg(x):
1111
    return x < 0
1112

    
1113

    
1114
API_Callpoint = QuotaholderDjangoDBCallpoint