Statistics
| Branch: | Tag: | Revision:

root / snf-quotaholder-app / quotaholder_django / quotaholder_app / callpoint.py @ 2a3f6a3e

History | View | Annotate | Download (38.9 kB)

1
# Copyright 2012, 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from synnefo.lib.quotaholder.api import (
35
    QuotaholderAPI,
36
    QH_PRACTICALLY_INFINITE,
37
    InvalidKeyError, NoEntityError,
38
    NoQuantityError, NoCapacityError,
39
    ExportLimitError, ImportLimitError,
40
    DuplicateError)
41

    
42
from synnefo.lib.commissioning import (
43
    Callpoint, CorruptedError, InvalidDataError, ReturnButFail)
44
from synnefo.lib.commissioning.utils.newname import newname
45

    
46
from django.db.models import Q, Count
47
from django.db import transaction
48
from .models import (Entity, Policy, Holding,
49
                     Commission, Provision, ProvisionLog, CallSerial,
50
                     now,
51
                     db_get_entity, db_get_holding, db_get_policy,
52
                     db_get_commission, db_filter_provision, db_get_callserial)
53

    
54

    
55
class QuotaholderDjangoDBCallpoint(Callpoint):
56

    
57
    api_spec = QuotaholderAPI()
58

    
59
    http_exc_lookup = {
60
        CorruptedError:   550,
61
        InvalidDataError: 400,
62
        InvalidKeyError:  401,
63
        NoEntityError:    404,
64
        NoQuantityError:  413,
65
        NoCapacityError:  413,
66
    }
67

    
68
    def init_connection(self, connection):
69
        if connection is not None:
70
            raise ValueError("Cannot specify connection args with %s" %
71
                             type(self).__name__)
72

    
73
    def commit(self):
74
        transaction.commit()
75

    
76
    def rollback(self):
77
        transaction.rollback()
78

    
79
    def do_make_call(self, call_name, data):
80
        call_fn = getattr(self, call_name, None)
81
        if not call_fn:
82
            m = "cannot find call '%s'" % (call_name,)
83
            raise CorruptedError(m)
84

    
85
        return call_fn(**data)
86

    
87
    def create_entity(self, context={}, create_entity=()):
88
        rejected = []
89
        append = rejected.append
90

    
91
        for idx, (entity, owner, key, ownerkey) in enumerate(create_entity):
92
            try:
93
                owner = Entity.objects.get(entity=owner, key=ownerkey)
94
            except Entity.DoesNotExist:
95
                append(idx)
96
                continue
97

    
98
            try:
99
                e = Entity.objects.get(entity=entity)
100
                append(idx)
101
            except Entity.DoesNotExist:
102
                e = Entity.objects.create(entity=entity,
103
                                          owner=owner,
104
                                          key=key)
105

    
106
        if rejected:
107
            raise ReturnButFail(rejected)
108
        return rejected
109

    
110
    def set_entity_key(self, context={}, set_entity_key=()):
111
        rejected = []
112
        append = rejected.append
113

    
114
        for entity, key, newkey in set_entity_key:
115
            try:
116
                e = db_get_entity(entity=entity, key=key, for_update=True)
117
            except Entity.DoesNotExist:
118
                append(entity)
119
                continue
120

    
121
            e.key = newkey
122
            e.save()
123

    
124
        if rejected:
125
            raise ReturnButFail(rejected)
126
        return rejected
127

    
128
    def list_entities(self, context={}, entity=None, key=None):
129
        try:
130
            e = Entity.objects.get(entity=entity, key=key)
131
        except Entity.DoesNotExist:
132
            m = "Entity '%s' does not exist" % (entity,)
133
            raise NoEntityError(m)
134

    
135
        children = e.entities.all()
136
        entities = [e.entity for e in children]
137
        return entities
138

    
139
    def get_entity(self, context={}, get_entity=()):
140
        entities = []
141
        append = entities.append
142

    
143
        names = [entity for entity, key in get_entity]
144
        es = Entity.objects.select_related(depth=1).filter(entity__in=names)
145
        data = {}
146
        for e in es:
147
            data[e.entity] = e
148

    
149
        for entity, key in get_entity:
150
            e = data.get(entity, None)
151
            if e is None or e.key != key:
152
                continue
153
            append((entity, e.owner.entity))
154

    
155
        return entities
156

    
157
    def get_limits(self, context={}, get_limits=()):
158
        limits = []
159
        append = limits.append
160

    
161
        for policy in get_limits:
162
            try:
163
                p = Policy.objects.get(policy=policy)
164
            except Policy.DoesNotExist:
165
                continue
166

    
167
            append((policy, p.quantity, p.capacity,
168
                    p.import_limit, p.export_limit))
169

    
170
        return limits
171

    
172
    def set_limits(self, context={}, set_limits=()):
173

    
174
        for (policy, quantity, capacity,
175
             import_limit, export_limit) in set_limits:
176

    
177
            try:
178
                policy = db_get_policy(policy=policy, for_update=True)
179
            except Policy.DoesNotExist:
180
                Policy.objects.create(policy=policy,
181
                                      quantity=quantity,
182
                                      capacity=capacity,
183
                                      import_limit=import_limit,
184
                                      export_limit=export_limit)
185
            else:
186
                policy.quantity = quantity
187
                policy.capacity = capacity
188
                policy.export_limit = export_limit
189
                policy.import_limit = import_limit
190
                policy.save()
191

    
192
        return ()
193

    
194
    def get_holding(self, context={}, get_holding=()):
195
        holdings = []
196
        append = holdings.append
197

    
198
        for entity, resource, key in get_holding:
199
            try:
200
                h = Holding.objects.get(entity=entity, resource=resource)
201
            except Holding.DoesNotExist:
202
                continue
203

    
204
            if h.entity.key != key:
205
                continue
206

    
207
            append((h.entity.entity, h.resource, h.policy.policy,
208
                    h.imported, h.exported,
209
                    h.returned, h.released, h.flags))
210

    
211
        return holdings
212

    
213
    def _set_holding(self, entity, resource, policy, flags):
214
        try:
215
            h = db_get_holding(entity=entity, resource=resource,
216
                               for_update=True)
217
            h.policy = p
218
            h.flags = flags
219
            h.save()
220
        except Holding.DoesNotExist:
221
            h = Holding.objects.create(entity=e, resource=resource,
222
                                       policy=p, flags=flags)
223
        return h
224

    
225
    def set_holding(self, context={}, set_holding=()):
226
        rejected = []
227
        append = rejected.append
228

    
229
        for entity, resource, key, policy, flags in set_holding:
230
            try:
231
                e = Entity.objects.get(entity=entity, key=key)
232
            except Entity.DoesNotExist:
233
                append((entity, resource, policy))
234
                continue
235

    
236
            if e.key != key:
237
                append((entity, resource, policy))
238
                continue
239

    
240
            try:
241
                p = Policy.objects.get(policy=policy)
242
            except Policy.DoesNotExist:
243
                append((entity, resource, policy))
244
                continue
245

    
246
            try:
247
                h = db_get_holding(entity=entity, resource=resource,
248
                                   for_update=True)
249
                h.policy = p
250
                h.flags = flags
251
                h.save()
252
            except Holding.DoesNotExist:
253
                h = Holding.objects.create(entity=e, resource=resource,
254
                                           policy=p, flags=flags)
255

    
256
        if rejected:
257
            raise ReturnButFail(rejected)
258
        return rejected
259

    
260
    def _init_holding(self,
261
                      entity, resource, policy,
262
                      imported, exported, returned, released,
263
                      flags):
264
        try:
265
            h = db_get_holding(entity=entity, resource=resource,
266
                               for_update=True)
267
        except Holding.DoesNotExist:
268
            h = Holding(entity=entity, resource=resource)
269

    
270
        h.policy = policy
271
        h.flags = flags
272
        h.imported = imported
273
        h.importing = imported
274
        h.exported = exported
275
        h.exporting = exported
276
        h.returned = returned
277
        h.returning = returned
278
        h.released = released
279
        h.releasing = released
280
        h.save()
281

    
282
    def init_holding(self, context={}, init_holding=()):
283
        rejected = []
284
        append = rejected.append
285

    
286
        for idx, sfh in enumerate(init_holding):
287
            (entity, resource, key, policy,
288
             imported, exported, returned, released,
289
             flags) = sfh
290
            try:
291
                e = Entity.objects.get(entity=entity, key=key)
292
            except Entity.DoesNotExist:
293
                append(idx)
294
                continue
295

    
296
            if e.key != key:
297
                append(idx)
298
                continue
299

    
300
            try:
301
                p = Policy.objects.get(policy=policy)
302
            except Policy.DoesNotExist:
303
                append(idx)
304
                continue
305

    
306
            self._init_holding(e, resource, p,
307
                               imported, exported,
308
                               returned, released,
309
                               flags)
310
        if rejected:
311
            raise ReturnButFail(rejected)
312
        return rejected
313

    
314
    def reset_holding(self, context={}, reset_holding=()):
315
        rejected = []
316
        append = rejected.append
317

    
318
        for idx, tpl in enumerate(reset_holding):
319
            (entity, resource, key,
320
             imported, exported, returned, released) = tpl
321
            try:
322
                e = Entity.objects.get(entity=entity, key=key)
323
            except Entity.DoesNotExist:
324
                append(idx)
325
                continue
326

    
327
            try:
328
                h = db_get_holding(entity=entity, resource=resource,
329
                                   for_update=True)
330
                h.imported = imported
331
                h.importing = imported
332
                h.exported = exported
333
                h.exporting = exported
334
                h.returned = returned
335
                h.returning = returned
336
                h.released = released
337
                h.releasing = released
338
                h.save()
339
            except Holding.DoesNotExist:
340
                append(idx)
341
                continue
342

    
343
        if rejected:
344
            raise ReturnButFail(rejected)
345
        return rejected
346

    
347
    def _check_pending(self, entity, resource):
348
        cs = Commission.objects.filter(entity=entity)
349
        cs = [c for c in cs if c.provisions.filter(resource=resource)]
350
        as_target = [c.serial for c in cs]
351

    
352
        ps = Provision.objects.filter(entity=entity, resource=resource)
353
        as_source = [p.serial.serial for p in ps]
354

    
355
        return as_target + as_source
356

    
357
    def _actual_quantity(self, holding):
358
        hp = holding.policy
359
        return hp.quantity + (holding.imported + holding.returned -
360
                              holding.exported - holding.released)
361

    
362
    def _new_policy_name(self):
363
        return newname('policy_')
364

    
365
    def _increase_resource(self, entity, resource, amount):
366
        try:
367
            h = db_get_holding(entity=entity, resource=resource,
368
                               for_update=True)
369
        except Holding.DoesNotExist:
370
            h = Holding(entity=entity, resource=resource)
371
            p = Policy.objects.create(policy=self._new_policy_name(),
372
                                      quantity=0,
373
                                      capacity=QH_PRACTICALLY_INFINITE,
374
                                      import_limit=QH_PRACTICALLY_INFINITE,
375
                                      export_limit=QH_PRACTICALLY_INFINITE)
376
            h.policy = p
377
        h.imported += amount
378
        h.save()
379

    
380
    def release_holding(self, context={}, release_holding=()):
381
        rejected = []
382
        append = rejected.append
383

    
384
        for idx, (entity, resource, key) in enumerate(release_holding):
385
            try:
386
                h = db_get_holding(entity=entity, resource=resource,
387
                                   for_update=True)
388
            except Holding.DoesNotExist:
389
                append(idx)
390
                continue
391

    
392
            if h.entity.key != key:
393
                append(idx)
394
                continue
395

    
396
            if self._check_pending(entity, resource):
397
                append(idx)
398
                continue
399

    
400
            q = self._actual_quantity(h)
401
            if q > 0:
402
                owner = h.entity.owner
403
                self._increase_resource(owner, resource, q)
404

    
405
            h.delete()
406

    
407
        if rejected:
408
            raise ReturnButFail(rejected)
409
        return rejected
410

    
411
    def list_resources(self, context={}, entity=None, key=None):
412
        try:
413
            e = Entity.objects.get(entity=entity)
414
        except Entity.DoesNotExist:
415
            m = "No such entity '%s'" % (entity,)
416
            raise NoEntityError(m)
417

    
418
        if e.key != key:
419
            m = "Invalid key for entity '%s'" % (entity,)
420
            raise InvalidKeyError(m)
421

    
422
        holdings = e.holding_set.filter(entity=entity)
423
        resources = [h.resource for h in holdings]
424
        return resources
425

    
426
    def list_holdings(self, context={}, list_holdings=()):
427
        rejected = []
428
        reject = rejected.append
429
        holdings_list = []
430
        append = holdings_list.append
431

    
432
        for entity, key in list_holdings:
433
            try:
434
                e = Entity.objects.get(entity=entity)
435
                if e.key != key:
436
                    raise Entity.DoesNotExist("wrong key")
437
            except Entity.DoesNotExist:
438
                reject(entity)
439
                continue
440

    
441
            holdings = e.holding_set.filter(entity=entity)
442
            append([[entity, h.resource,
443
                     h.imported, h.exported, h.returned, h.released]
444
                    for h in holdings])
445

    
446
        return holdings_list, rejected
447

    
448
    def get_quota(self, context={}, get_quota=()):
449
        quotas = []
450
        append = quotas.append
451

    
452
        entities = set(e for e, r, k in get_quota)
453
        hs = Holding.objects.select_related().filter(entity__in=entities)
454
        holdings = {}
455
        for h in hs:
456
            holdings[(h.entity_id, h.resource)] = h
457

    
458
        for entity, resource, key in get_quota:
459
            try:
460
                h = holdings[(entity, resource)]
461
            except:
462
                continue
463

    
464
            if h.entity.key != key:
465
                continue
466

    
467
            p = h.policy
468

    
469
            append((h.entity.entity, h.resource, p.quantity, p.capacity,
470
                    p.import_limit, p.export_limit,
471
                    h.imported, h.exported,
472
                    h.returned, h.released,
473
                    h.flags))
474

    
475
        return quotas
476

    
477
    def set_quota(self, context={}, set_quota=()):
478
        rejected = []
479
        append = rejected.append
480

    
481
        q_holdings = Q()
482
        entities = []
483
        for (entity, resource, key, _, _, _, _, _) in set_quota:
484

    
485
            q_holdings |= Q(entity=entity, resource=resource)
486
            entities.append(entity)
487

    
488
        hs = Holding.objects.filter(q_holdings).select_for_update()
489
        holdings = {}
490
        for h in hs:
491
            holdings[(h.entity_id, h.resource)] = h
492

    
493
        entities = Entity.objects.in_bulk(entities)
494

    
495
        old_policies = []
496

    
497
        for (entity, resource, key,
498
             quantity, capacity,
499
             import_limit, export_limit, flags) in set_quota:
500

    
501
            e = entities.get(entity, None)
502
            if e is None or e.key != key:
503
                append((entity, resource))
504
                continue
505

    
506
            policy = newname('policy_')
507
            newp = Policy(policy=policy,
508
                          quantity=quantity,
509
                          capacity=capacity,
510
                          import_limit=import_limit,
511
                          export_limit=export_limit)
512

    
513
            try:
514
                h = holdings[(entity, resource)]
515
                old_policies.append(h.policy_id)
516
                h.policy = newp
517
                h.flags = flags
518
            except KeyError:
519
                h = Holding(entity=e, resource=resource,
520
                            policy=newp, flags=flags)
521

    
522
            # the order is intentionally reversed so that it
523
            # would break if we are not within a transaction.
524
            # Has helped before.
525
            h.save()
526
            newp.save()
527
            holdings[(entity, resource)] = h
528

    
529
        objs = Policy.objects.annotate(refs=Count('holding'))
530
        objs.filter(policy__in=old_policies, refs=0).delete()
531

    
532
        if rejected:
533
            raise ReturnButFail(rejected)
534
        return rejected
535

    
536
    def add_quota(self,
537
                  context={}, clientkey=None, serial=None,
538
                  sub_quota=(), add_quota=()):
539
        rejected = []
540
        append = rejected.append
541

    
542
        if serial is not None:
543
            if clientkey is None:
544
                all_pairs = [(q[0], q[1]) for q in sub_quota + add_quota]
545
                raise ReturnButFail(all_pairs)
546
            try:
547
                cs = CallSerial.objects.get(serial=serial, clientkey=clientkey)
548
                all_pairs = [(q[0], q[1]) for q in sub_quota + add_quota]
549
                raise ReturnButFail(all_pairs)
550
            except CallSerial.DoesNotExist:
551
                pass
552

    
553
        sources = sub_quota + add_quota
554
        q_holdings = Q()
555
        entities = []
556
        for (entity, resource, key, _, _, _, _) in sources:
557

    
558
            q_holdings |= Q(entity=entity, resource=resource)
559
            entities.append(entity)
560

    
561
        hs = Holding.objects.filter(q_holdings).select_for_update()
562
        holdings = {}
563
        for h in hs:
564
            holdings[(h.entity_id, h.resource)] = h
565

    
566
        entities = Entity.objects.in_bulk(entities)
567

    
568
        pids = [h.policy_id for h in hs]
569
        policies = Policy.objects.in_bulk(pids)
570

    
571
        old_policies = []
572

    
573
        for removing, source in [(True, sub_quota), (False, add_quota)]:
574
            for (entity, resource, key,
575
                 quantity, capacity,
576
                 import_limit, export_limit) in source:
577

    
578
                e = entities.get(entity, None)
579
                if e is None or e.key != key:
580
                    append((entity, resource))
581
                    continue
582

    
583
                try:
584
                    h = holdings[(entity, resource)]
585
                    old_policies.append(h.policy_id)
586
                    try:
587
                        p = policies[h.policy_id]
588
                    except KeyError:
589
                        raise AssertionError("no policy %s" % h.policy_id)
590
                except KeyError:
591
                    if removing:
592
                        append((entity, resource))
593
                        continue
594
                    h = Holding(entity=e, resource=resource, flags=0)
595
                    p = None
596

    
597
                policy = newname('policy_')
598
                newp = Policy(policy=policy)
599

    
600
                newp.quantity = _add(p.quantity if p else 0, quantity,
601
                                     invert=removing)
602
                newp.capacity = _add(p.capacity if p else 0, capacity,
603
                                     invert=removing)
604
                newp.import_limit = _add(p.import_limit if p else 0,
605
                                         import_limit, invert=removing)
606
                newp.export_limit = _add(p.export_limit if p else 0,
607
                                         export_limit, invert=removing)
608

    
609
                new_values = [newp.capacity,
610
                              newp.import_limit, newp.export_limit]
611
                if any(map(_isneg, new_values)):
612
                    append((entity, resource))
613
                    continue
614

    
615
                h.policy = newp
616

    
617
                # the order is intentionally reversed so that it
618
                # would break if we are not within a transaction.
619
                # Has helped before.
620
                h.save()
621
                newp.save()
622
                policies[policy] = newp
623
                holdings[(entity, resource)] = h
624

    
625
        objs = Policy.objects.annotate(refs=Count('holding'))
626
        objs.filter(policy__in=old_policies, refs=0).delete()
627

    
628
        if rejected:
629
            raise ReturnButFail(rejected)
630

    
631
        if serial is not None and clientkey is not None:
632
            CallSerial.objects.create(serial=serial, clientkey=clientkey)
633
        return rejected
634

    
635
    def ack_serials(self, context={}, clientkey=None, serials=()):
636
        if clientkey is None:
637
            return
638

    
639
        for serial in serials:
640
            try:
641
                c = db_get_callserial(clientkey=clientkey,
642
                                      serial=serial,
643
                                      for_update=True)
644
                c.delete()
645
            except CallSerial.DoesNotExist:
646
                pass
647
        return
648

    
649
    def query_serials(self, context={}, clientkey=None, serials=()):
650
        result = []
651
        append = result.append
652

    
653
        if clientkey is None:
654
            return result
655

    
656
        if not serials:
657
            cs = CallSerial.objects.filter(clientkey=clientkey)
658
            return [c.serial for c in cs]
659

    
660
        for serial in serials:
661
            try:
662
                db_get_callserial(clientkey=clientkey, serial=serial)
663
                append(serial)
664
            except CallSerial.DoesNotExist:
665
                pass
666

    
667
        return result
668

    
669
    def issue_commission(self,
670
                         context={},
671
                         clientkey=None,
672
                         target=None,
673
                         key=None,
674
                         name=None,
675
                         provisions=()):
676

    
677
        try:
678
            t = Entity.objects.get(entity=target)
679
        except Entity.DoesNotExist:
680
            m = "No target entity '%s'" % (target,)
681
            raise NoEntityError(m)
682
        else:
683
            if t.key != key:
684
                m = "Invalid key for target entity '%s'" % (target,)
685
                raise InvalidKeyError(m)
686

    
687
        create = Commission.objects.create
688
        commission = create(entity_id=target, clientkey=clientkey, name=name)
689
        serial = commission.serial
690

    
691
        checked = []
692
        for entity, resource, quantity in provisions:
693

    
694
            if entity == target:
695
                m = "Cannot issue commission from an entity to itself (%s)" % (
696
                    entity,)
697
                raise InvalidDataError(m)
698

    
699
            ent_res = entity, resource
700
            if ent_res in checked:
701
                m = "Duplicate provision for %s.%s" % ent_res
702
                raise DuplicateError(m)
703
            checked.append(ent_res)
704

    
705
            try:
706
                e = Entity.objects.get(entity=entity)
707
            except Entity.DoesNotExist:
708
                m = "No source entity '%s'" % (entity,)
709
                raise NoEntityError(m)
710

    
711
            release = 0
712
            if quantity < 0:
713
                release = 1
714

    
715
            # Source limits checks
716
            try:
717
                h = db_get_holding(entity=entity, resource=resource,
718
                                   for_update=True)
719
            except Holding.DoesNotExist:
720
                m = ("There is no quantity "
721
                     "to allocate from in %s.%s" % (entity, resource))
722
                raise NoQuantityError(m,
723
                                      source=entity, target=target,
724
                                      resource=resource, requested=quantity,
725
                                      current=0, limit=0)
726

    
727
            hp = h.policy
728

    
729
            if not release:
730
                current = h.exporting
731
                limit = hp.export_limit
732
                if current + quantity > limit:
733
                    m = ("Export limit reached for %s.%s" % (entity, resource))
734
                    raise ExportLimitError(m,
735
                                           source=entity,
736
                                           target=target,
737
                                           resource=resource,
738
                                           requested=quantity,
739
                                           current=current,
740
                                           limit=limit)
741

    
742
                limit = hp.quantity + h.imported - h.releasing
743
                unavailable = h.exporting - h.returned
744
                available = limit - unavailable
745

    
746
                if quantity > available:
747
                    m = ("There is not enough quantity "
748
                         "to allocate from in %s.%s" % (entity, resource))
749
                    raise NoQuantityError(m,
750
                                          source=entity,
751
                                          target=target,
752
                                          resource=resource,
753
                                          requested=quantity,
754
                                          current=unavailable,
755
                                          limit=limit)
756
            else:
757
                current = (+ h.importing + h.returning
758
                           - h.exported - h.returned)
759
                limit = hp.capacity
760
                if current - quantity > limit:
761
                    m = ("There is not enough capacity "
762
                         "to release to in %s.%s" % (entity, resource))
763
                    raise NoQuantityError(m,
764
                                          source=entity,
765
                                          target=target,
766
                                          resource=resource,
767
                                          requested=quantity,
768
                                          current=current,
769
                                          limit=limit)
770

    
771
            # Target limits checks
772
            try:
773
                th = db_get_holding(entity=target, resource=resource,
774
                                    for_update=True)
775
            except Holding.DoesNotExist:
776
                m = ("There is no capacity "
777
                     "to allocate into in %s.%s" % (target, resource))
778
                raise NoCapacityError(m,
779
                                      source=entity,
780
                                      target=target,
781
                                      resource=resource,
782
                                      requested=quantity,
783
                                      current=0,
784
                                      limit=0)
785

    
786
            tp = th.policy
787

    
788
            if not release:
789
                limit = tp.import_limit
790
                current = th.importing
791
                if current + quantity > limit:
792
                    m = ("Import limit reached for %s.%s" % (target, resource))
793
                    raise ImportLimitError(m,
794
                                           source=entity,
795
                                           target=target,
796
                                           resource=resource,
797
                                           requested=quantity,
798
                                           current=current,
799
                                           limit=limit)
800

    
801
                limit = tp.quantity + tp.capacity
802
                current = (+ th.importing + th.returning + tp.quantity
803
                           - th.exported - th.released)
804

    
805
                if current + quantity > limit:
806
                    m = ("There is not enough capacity "
807
                         "to allocate into in %s.%s" % (target, resource))
808
                    raise NoCapacityError(m,
809
                                          source=entity,
810
                                          target=target,
811
                                          resource=resource,
812
                                          requested=quantity,
813
                                          current=current,
814
                                          limit=limit)
815
            else:
816
                limit = tp.quantity + th.imported - th.releasing
817
                unavailable = th.exporting - th.returned
818
                available = limit - unavailable
819

    
820
                if available + quantity < 0:
821
                    m = ("There is not enough quantity "
822
                         "to release from in %s.%s" % (target, resource))
823
                    raise NoCapacityError(m,
824
                                          source=entity,
825
                                          target=target,
826
                                          resource=resource,
827
                                          requested=quantity,
828
                                          current=unavailable,
829
                                          limit=limit)
830

    
831
            Provision.objects.create(serial=commission,
832
                                     entity=e,
833
                                     resource=resource,
834
                                     quantity=quantity)
835
            if release:
836
                h.returning -= quantity
837
                th.releasing -= quantity
838
            else:
839
                h.exporting += quantity
840
                th.importing += quantity
841

    
842
            h.save()
843
            th.save()
844

    
845
        return serial
846

    
847
    def _log_provision(self,
848
                       commission, s_holding, t_holding,
849
                       provision, log_time, reason):
850

    
851
        s_entity = s_holding.entity
852
        s_policy = s_holding.policy
853
        t_entity = t_holding.entity
854
        t_policy = t_holding.policy
855

    
856
        kwargs = {
857
            'serial':              commission.serial,
858
            'name':                commission.name,
859
            'source':              s_entity.entity,
860
            'target':              t_entity.entity,
861
            'resource':            provision.resource,
862
            'source_quantity':     s_policy.quantity,
863
            'source_capacity':     s_policy.capacity,
864
            'source_import_limit': s_policy.import_limit,
865
            'source_export_limit': s_policy.export_limit,
866
            'source_imported':     s_holding.imported,
867
            'source_exported':     s_holding.exported,
868
            'source_returned':     s_holding.returned,
869
            'source_released':     s_holding.released,
870
            'target_quantity':     t_policy.quantity,
871
            'target_capacity':     t_policy.capacity,
872
            'target_import_limit': t_policy.import_limit,
873
            'target_export_limit': t_policy.export_limit,
874
            'target_imported':     t_holding.imported,
875
            'target_exported':     t_holding.exported,
876
            'target_returned':     t_holding.returned,
877
            'target_released':     t_holding.released,
878
            'delta_quantity':      provision.quantity,
879
            'issue_time':          commission.issue_time,
880
            'log_time':            log_time,
881
            'reason':              reason,
882
        }
883

    
884
        ProvisionLog.objects.create(**kwargs)
885

    
886
    def accept_commission(self,
887
                          context={}, clientkey=None,
888
                          serials=(), reason=''):
889
        log_time = now()
890

    
891
        for serial in serials:
892
            try:
893
                c = db_get_commission(clientkey=clientkey, serial=serial,
894
                                      for_update=True)
895
            except Commission.DoesNotExist:
896
                return
897

    
898
            t = c.entity
899

    
900
            provisions = db_filter_provision(serial=serial, for_update=True)
901
            for pv in provisions:
902
                try:
903
                    h = db_get_holding(entity=pv.entity.entity,
904
                                       resource=pv.resource, for_update=True)
905
                    th = db_get_holding(entity=t, resource=pv.resource,
906
                                        for_update=True)
907
                except Holding.DoesNotExist:
908
                    m = "Corrupted provision"
909
                    raise CorruptedError(m)
910

    
911
                quantity = pv.quantity
912
                release = 0
913
                if quantity < 0:
914
                    release = 1
915

    
916
                if release:
917
                    h.returned -= quantity
918
                    th.released -= quantity
919
                else:
920
                    h.exported += quantity
921
                    th.imported += quantity
922

    
923
                reason = 'ACCEPT:' + reason[-121:]
924
                self._log_provision(c, h, th, pv, log_time, reason)
925
                h.save()
926
                th.save()
927
                pv.delete()
928
            c.delete()
929

    
930
        return
931

    
932
    def reject_commission(self,
933
                          context={}, clientkey=None,
934
                          serials=(), reason=''):
935
        log_time = now()
936

    
937
        for serial in serials:
938
            try:
939
                c = db_get_commission(clientkey=clientkey, serial=serial,
940
                                      for_update=True)
941
            except Commission.DoesNotExist:
942
                return
943

    
944
            t = c.entity
945

    
946
            provisions = db_filter_provision(serial=serial, for_update=True)
947
            for pv in provisions:
948
                try:
949
                    h = db_get_holding(entity=pv.entity.entity,
950
                                       resource=pv.resource, for_update=True)
951
                    th = db_get_holding(entity=t, resource=pv.resource,
952
                                        for_update=True)
953
                except Holding.DoesNotExist:
954
                    m = "Corrupted provision"
955
                    raise CorruptedError(m)
956

    
957
                quantity = pv.quantity
958
                release = 0
959
                if quantity < 0:
960
                    release = 1
961

    
962
                if release:
963
                    h.returning += quantity
964
                    th.releasing += quantity
965
                else:
966
                    h.exporting -= quantity
967
                    th.importing -= quantity
968

    
969
                reason = 'REJECT:' + reason[-121:]
970
                self._log_provision(c, h, th, pv, log_time, reason)
971
                h.save()
972
                th.save()
973
                pv.delete()
974
            c.delete()
975

    
976
        return
977

    
978
    def get_pending_commissions(self, context={}, clientkey=None):
979
        pending = Commission.objects.filter(clientkey=clientkey)
980
        pending_list = pending.values_list('serial', flat=True)
981
        return pending_list
982

    
983
    def resolve_pending_commissions(self,
984
                                    context={}, clientkey=None,
985
                                    max_serial=None, accept_set=()):
986
        accept_set = set(accept_set)
987
        pending = self.get_pending_commissions(context=context,
988
                                               clientkey=clientkey)
989
        pending = sorted(pending)
990

    
991
        accept = self.accept_commission
992
        reject = self.reject_commission
993

    
994
        for serial in pending:
995
            if serial > max_serial:
996
                break
997

    
998
            if serial in accept_set:
999
                accept(context=context, clientkey=clientkey, serials=[serial])
1000
            else:
1001
                reject(context=context, clientkey=clientkey, serials=[serial])
1002

    
1003
        return
1004

    
1005
    def release_entity(self, context={}, release_entity=()):
1006
        rejected = []
1007
        append = rejected.append
1008
        for entity, key in release_entity:
1009
            try:
1010
                e = db_get_entity(entity=entity, key=key, for_update=True)
1011
            except Entity.DoesNotExist:
1012
                append(entity)
1013
                continue
1014

    
1015
            if e.entities.count() != 0:
1016
                append(entity)
1017
                continue
1018

    
1019
            if e.holding_set.count() != 0:
1020
                append(entity)
1021
                continue
1022

    
1023
            e.delete()
1024

    
1025
        if rejected:
1026
            raise ReturnButFail(rejected)
1027
        return rejected
1028

    
1029
    def get_timeline(self, context={}, after="", before="Z", get_timeline=()):
1030
        entity_set = set()
1031
        e_add = entity_set.add
1032
        resource_set = set()
1033
        r_add = resource_set.add
1034

    
1035
        for entity, resource, key in get_timeline:
1036
            if entity not in entity_set:
1037
                try:
1038
                    e = Entity.objects.get(entity=entity, key=key)
1039
                    e_add(entity)
1040
                except Entity.DoesNotExist:
1041
                    continue
1042

    
1043
            r_add((entity, resource))
1044

    
1045
        chunk_size = 65536
1046
        nr = 0
1047
        timeline = []
1048
        append = timeline.append
1049
        filterlogs = ProvisionLog.objects.filter
1050
        if entity_set:
1051
            q_entity = Q(source__in=entity_set) | Q(target__in=entity_set)
1052
        else:
1053
            q_entity = Q()
1054

    
1055
        while 1:
1056
            logs = filterlogs(q_entity,
1057
                              issue_time__gt=after,
1058
                              issue_time__lte=before,
1059
                              reason__startswith='ACCEPT:')
1060

    
1061
            logs = logs.order_by('issue_time')
1062
            #logs = logs.values()
1063
            logs = logs[:chunk_size]
1064
            nr += len(logs)
1065
            if not logs:
1066
                break
1067
            for g in logs:
1068
                if ((g.source, g.resource) not in resource_set
1069
                    or (g.target, g.resource) not in resource_set):
1070
                    continue
1071

    
1072
                o = {
1073
                    'serial':                   g.serial,
1074
                    'source':                   g.source,
1075
                    'target':                   g.target,
1076
                    'resource':                 g.resource,
1077
                    'name':                     g.name,
1078
                    'quantity':                 g.delta_quantity,
1079
                    'source_allocated':         g.source_allocated(),
1080
                    'source_allocated_through': g.source_allocated_through(),
1081
                    'source_inbound':           g.source_inbound(),
1082
                    'source_inbound_through':   g.source_inbound_through(),
1083
                    'source_outbound':          g.source_outbound(),
1084
                    'source_outbound_through':  g.source_outbound_through(),
1085
                    'target_allocated':         g.target_allocated(),
1086
                    'target_allocated_through': g.target_allocated_through(),
1087
                    'target_inbound':           g.target_inbound(),
1088
                    'target_inbound_through':   g.target_inbound_through(),
1089
                    'target_outbound':          g.target_outbound(),
1090
                    'target_outbound_through':  g.target_outbound_through(),
1091
                    'issue_time':               g.issue_time,
1092
                    'log_time':                 g.log_time,
1093
                    'reason':                   g.reason,
1094
                }
1095

    
1096
                append(o)
1097

    
1098
            after = g.issue_time
1099
            if after >= before:
1100
                break
1101

    
1102
        return timeline
1103

    
1104

    
1105
def _add(x, y, invert=False):
1106
    return x + y if not invert else x - y
1107

    
1108

    
1109
def _update(dest, source, attr, delta):
1110
    dest_attr = getattr(dest, attr)
1111
    dest_attr = _add(getattr(source, attr, 0), delta)
1112

    
1113

    
1114
def _isneg(x):
1115
    return x < 0
1116

    
1117

    
1118
API_Callpoint = QuotaholderDjangoDBCallpoint