Statistics
| Branch: | Tag: | Revision:

root / snf-quotaholder-app / quotaholder_django / quotaholder_app / callpoint.py @ d2b32360

History | View | Annotate | Download (34.7 kB)

1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from synnefo.lib.quotaholder.api import (
35
                            QuotaholderAPI,
36
                            InvalidKeyError, NoEntityError,
37
                            NoQuantityError, NoCapacityError,
38
                            ExportLimitError, ImportLimitError,
39
                            DuplicateError)
40

    
41
from synnefo.lib.commissioning import \
42
    Callpoint, CorruptedError, InvalidDataError, ReturnButFail
43
from synnefo.lib.commissioning.utils.newname import newname
44

    
45
from django.db.models import Q
46
from django.db import transaction, IntegrityError
47
from .models import (Holder, Entity, Policy, Holding,
48
                     Commission, Provision, ProvisionLog, CallSerial,
49
                     now,
50
                     db_get_entity, db_get_holding, db_get_policy,
51
                     db_get_commission, db_filter_provision, db_get_callserial)
52

    
53
class QuotaholderDjangoDBCallpoint(Callpoint):
54

    
55
    api_spec = QuotaholderAPI()
56

    
57
    http_exc_lookup = {
58
        CorruptedError:   550,
59
        InvalidDataError: 400,
60
        InvalidKeyError:  401,
61
        NoEntityError:    404,
62
        NoQuantityError:  413,
63
        NoCapacityError:  413,
64
    }
65

    
66
    def init_connection(self, connection):
67
        if connection is not None:
68
            raise ValueError("Cannot specify connection args with %s" %
69
                             type(self).__name__)
70
        pass
71

    
72
    def commit(self):
73
        transaction.commit()
74

    
75
    def rollback(self):
76
        transaction.rollback()
77

    
78
    def do_make_call(self, call_name, data):
79
        call_fn = getattr(self, call_name, None)
80
        if not call_fn:
81
            m = "cannot find call '%s'" % (call_name,)
82
            raise CorruptedError(m)
83

    
84
        return call_fn(**data)
85

    
86
    def create_entity(self, context={}, create_entity=()):
87
        rejected = []
88
        append = rejected.append
89

    
90
        for idx, (entity, owner, key, ownerkey) in enumerate(create_entity):
91
            try:
92
                owner = Entity.objects.get(entity=owner, key=ownerkey)
93
            except Entity.DoesNotExist:
94
                append(idx)
95
                continue
96

    
97
            try:
98
                e = Entity.objects.get(entity=entity)
99
                append(idx)
100
            except Entity.DoesNotExist:
101
                e = Entity.objects.create(entity=entity,
102
                                          owner=owner,
103
                                          key=key)
104

    
105
        if rejected:
106
            raise ReturnButFail(rejected)
107
        return rejected
108

    
109
    def set_entity_key(self, context={}, set_entity_key=()):
110
        rejected = []
111
        append = rejected.append
112

    
113
        for entity, key, newkey in set_entity_key:
114
            try:
115
                e = db_get_entity(entity=entity, key=key, for_update=True)
116
            except Entity.DoesNotExist:
117
                append(entity)
118
                continue
119

    
120
            e.key = newkey
121
            e.save()
122

    
123
        if rejected:
124
            raise ReturnButFail(rejected)
125
        return rejected
126

    
127
    def list_entities(self, context={}, entity=None, key=None):
128
        try:
129
            e = Entity.objects.get(entity=entity, key=key)
130
        except Entity.DoesNotExist:
131
            m = "Entity '%s' does not exist" % (entity,)
132
            raise NoEntityError(m)
133

    
134
        children = e.entities.all()
135
        entities = [e.entity for e in children]
136
        return entities
137

    
138
    def get_entity(self, context={}, get_entity=()):
139
        entities = []
140
        append = entities.append
141

    
142
        for entity, key in get_entity:
143
            try:
144
                e = Entity.objects.get(entity=entity, key=key)
145
            except Entity.DoesNotExist:
146
                continue
147

    
148
            append((entity, e.owner.entity))
149

    
150
        return entities
151

    
152
    def get_limits(self, context={}, get_limits=()):
153
        limits = []
154
        append = limits.append
155

    
156
        for policy in get_limits:
157
            try:
158
                p = Policy.objects.get(policy=policy)
159
            except Policy.DoesNotExist:
160
                continue
161

    
162
            append((policy, p.quantity, p.capacity,
163
                    p.import_limit, p.export_limit))
164

    
165
        return limits
166

    
167
    def set_limits(self, context={}, set_limits=()):
168

    
169
        for (   policy, quantity, capacity,
170
                import_limit, export_limit  ) in set_limits:
171

    
172
                try:
173
                    policy = db_get_policy(policy=policy, for_update=True)
174
                except Policy.DoesNotExist:
175
                    Policy.objects.create(  policy=policy,
176
                                            quantity=quantity,
177
                                            capacity=capacity,
178
                                            import_limit=import_limit,
179
                                            export_limit=export_limit   )
180
                else:
181
                    policy.quantity = quantity
182
                    policy.capacity = capacity
183
                    policy.export_limit = export_limit
184
                    policy.import_limit = import_limit
185
                    policy.save()
186

    
187
        return ()
188

    
189
    def get_holding(self, context={}, get_holding=()):
190
        holdings = []
191
        append = holdings.append
192

    
193
        for entity, resource, key in get_holding:
194
            try:
195
                h = Holding.objects.get(entity=entity, resource=resource)
196
            except Holding.DoesNotExist:
197
                continue
198

    
199
            if h.entity.key != key:
200
                continue
201

    
202
            append((h.entity.entity, h.resource, h.policy.policy,
203
                    h.imported, h.exported,
204
                    h.returned, h.released, h.flags))
205

    
206
        return holdings
207

    
208
    def _set_holding(self, entity, resource, policy, flags):
209
        try:
210
            h = db_get_holding(entity=entity, resource=resource,
211
                               for_update=True)
212
            h.policy = p
213
            h.flags = flags
214
            h.save()
215
        except Holding.DoesNotExist:
216
            h = Holding.objects.create( entity=e, resource=resource,
217
                                        policy=p, flags=flags      )
218
        return h
219

    
220
    def set_holding(self, context={}, set_holding=()):
221
        rejected = []
222
        append = rejected.append
223

    
224
        for entity, resource, key, policy, flags in set_holding:
225
            try:
226
                e = Entity.objects.get(entity=entity, key=key)
227
            except Entity.DoesNotExist:
228
                append((entity, resource, policy))
229
                continue
230

    
231
            if e.key != key:
232
                append((entity, resource, policy))
233
                continue
234

    
235
            try:
236
                p = Policy.objects.get(policy=policy)
237
            except Policy.DoesNotExist:
238
                append((entity, resource, policy))
239
                continue
240

    
241
            try:
242
                h = db_get_holding(entity=entity, resource=resource,
243
                                   for_update=True)
244
                h.policy = p
245
                h.flags = flags
246
                h.save()
247
            except Holding.DoesNotExist:
248
                h = Holding.objects.create( entity=e, resource=resource,
249
                                            policy=p, flags=flags      )
250

    
251
        if rejected:
252
            raise ReturnButFail(rejected)
253
        return rejected
254

    
255
    def _init_holding(self, entity, resource, policy,
256
                          imported, exported, returned, released,
257
                          flags):
258
        try:
259
            h = db_get_holding(entity=entity, resource=resource,
260
                               for_update=True)
261
        except Holding.DoesNotExist:
262
            h = Holding(entity=entity, resource=resource)
263

    
264
        h.policy = policy
265
        h.flags = flags
266
        h.imported=imported
267
        h.importing=imported
268
        h.exported=exported
269
        h.exporting=exported
270
        h.returned=returned
271
        h.returning=returned
272
        h.released=released
273
        h.releasing=released
274
        h.save()
275

    
276
    def init_holding(self, context={}, init_holding=()):
277
        rejected = []
278
        append = rejected.append
279

    
280
        for idx, sfh in enumerate(init_holding):
281
            (entity, resource, key, policy,
282
             imported, exported, returned, released,
283
             flags) = sfh
284
            try:
285
                e = Entity.objects.get(entity=entity, key=key)
286
            except Entity.DoesNotExist:
287
                append(idx)
288
                continue
289

    
290
            if e.key != key:
291
                append(idx)
292
                continue
293

    
294
            try:
295
                p = Policy.objects.get(policy=policy)
296
            except Policy.DoesNotExist:
297
                append(idx)
298
                continue
299

    
300
            self._init_holding(e, resource, p,
301
                                   imported, exported,
302
                                   returned, released,
303
                                   flags)
304
        if rejected:
305
            raise ReturnButFail(rejected)
306
        return rejected
307

    
308
    def reset_holding(self, context={}, reset_holding=()):
309
        rejected = []
310
        append = rejected.append
311

    
312
        for idx, tpl in enumerate(reset_holding):
313
            (entity, resource, key,
314
             imported, exported, returned, released) = tpl
315
            try:
316
                e = Entity.objects.get(entity=entity, key=key)
317
            except Entity.DoesNotExist:
318
                append(idx)
319
                continue
320

    
321
            try:
322
                h = db_get_holding(entity=entity, resource=resource,
323
                                   for_update=True)
324
                h.imported=imported
325
                h.importing=imported
326
                h.exported=exported
327
                h.exporting=exported
328
                h.returned=returned
329
                h.returning=returned
330
                h.released=released
331
                h.releasing=released
332
                h.save()
333
            except Holding.DoesNotExist:
334
                append(idx)
335
                continue
336

    
337
        if rejected:
338
            raise ReturnButFail(rejected)
339
        return rejected
340

    
341
    def _check_pending(self, entity, resource):
342
        cs = Commission.objects.filter(entity=entity)
343
        cs = [c for c in cs if c.provisions.filter(resource=resource)]
344
        as_target = [c.serial for c in cs]
345

    
346
        ps = Provision.objects.filter(entity=entity, resource=resource)
347
        as_source = [p.serial.serial for p in ps]
348

    
349
        return as_target + as_source
350

    
351
    def _actual_quantity(self, holding):
352
        hp = holding.policy
353
        return hp.quantity + (holding.imported + holding.returned -
354
                              holding.exported - holding.released)
355

    
356
    def _new_policy_name(self):
357
        return newname('policy_')
358

    
359
    def _increase_resource(self, entity, resource, amount):
360
        try:
361
            h = db_get_holding(entity=entity, resource=resource,
362
                               for_update=True)
363
        except Holding.DoesNotExist:
364
            h = Holding(entity=entity, resource=resource)
365
            p = Policy.objects.create(policy=self._new_policy_name(),
366
                                      quantity=0)
367
            h.policy = p
368
        h.imported += amount
369
        h.save()
370

    
371
    def release_holding(self, context={}, release_holding=()):
372
        rejected = []
373
        append = rejected.append
374

    
375
        for idx, (entity, resource, key) in enumerate(release_holding):
376
            try:
377
                h = db_get_holding(entity=entity, resource=resource,
378
                                   for_update=True)
379
            except Holding.DoesNotExist:
380
                append(idx)
381
                continue
382

    
383
            if h.entity.key != key:
384
                append(idx)
385
                continue
386

    
387
            if self._check_pending(entity, resource):
388
                append(idx)
389
                continue
390

    
391
            q = self._actual_quantity(h)
392
            if q > 0:
393
                owner = h.entity.owner
394
                self._increase_resource(owner, resource, q)
395

    
396
            h.delete()
397

    
398
        if rejected:
399
            raise ReturnButFail(rejected)
400
        return rejected
401

    
402
    def list_resources(self, context={}, entity=None, key=None):
403
        try:
404
            e = Entity.objects.get(entity=entity)
405
        except Entity.DoesNotExist:
406
            m = "No such entity '%s'" % (entity,)
407
            raise NoEntityError(m)
408

    
409
        if e.key != key:
410
            m = "Invalid key for entity '%s'" % (entity,)
411
            raise InvalidKeyError(m)
412

    
413
        holdings = e.holding_set.filter(entity=entity)
414
        resources = [h.resource for h in holdings]
415
        return resources
416

    
417
    def list_holdings(self, context={}, list_holdings=()):
418
        rejected = []
419
        reject = rejected.append
420
        holdings_list = []
421
        append = holdings_list.append
422

    
423
        for entity, key in list_holdings:
424
            try:
425
                e = Entity.objects.get(entity=entity)
426
                if e.key != key:
427
                    raise Entity.DoesNotExist("wrong key")
428
            except Entity.DoesNotExist:
429
                reject(entity)
430
                continue
431

    
432
            holdings = e.holding_set.filter(entity=entity)
433
            append([[entity, h.resource,
434
                     h.imported, h.exported, h.returned, h.released]
435
                        for h in holdings])
436

    
437
        return holdings_list, rejected
438

    
439
    def get_quota(self, context={}, get_quota=()):
440
        quotas = []
441
        append = quotas.append
442

    
443
        for entity, resource, key in get_quota:
444
            try:
445
                h = Holding.objects.get(entity=entity, resource=resource)
446
            except Holding.DoesNotExist:
447
                continue
448

    
449
            if h.entity.key != key:
450
                continue
451

    
452
            p = h.policy
453

    
454
            append((h.entity.entity, h.resource, p.quantity, p.capacity,
455
                    p.import_limit, p.export_limit,
456
                    h.imported, h.exported,
457
                    h.returned, h.released,
458
                    h.flags))
459

    
460
        return quotas
461

    
462
    def set_quota(self, context={}, set_quota=()):
463
        rejected = []
464
        append = rejected.append
465

    
466
        for (   entity, resource, key,
467
                quantity, capacity,
468
                import_limit, export_limit, flags  ) in set_quota:
469

    
470
                try:
471
                    e = Entity.objects.get(entity=entity, key=key)
472
                except Entity.DoesNotExist:
473
                    append((entity, resource))
474
                    continue
475

    
476
                policy = newname('policy_')
477
                newp = Policy   (
478
                            policy=policy,
479
                            quantity=quantity,
480
                            capacity=capacity,
481
                            import_limit=import_limit,
482
                            export_limit=export_limit
483
                )
484

    
485
                try:
486
                    h = db_get_holding(entity=entity, resource=resource,
487
                                       for_update=True)
488
                    p = h.policy
489
                    h.policy = newp
490
                    h.flags = flags
491
                except Holding.DoesNotExist:
492
                    h = Holding(entity=e, resource=resource,
493
                                policy=newp, flags=flags)
494
                    p = None
495

    
496
                # the order is intentionally reversed so that it
497
                # would break if we are not within a transaction.
498
                # Has helped before.
499
                h.save()
500
                newp.save()
501

    
502
                if p is not None and p.holding_set.count() == 0:
503
                    p.delete()
504

    
505
        if rejected:
506
            raise ReturnButFail(rejected)
507
        return rejected
508

    
509
    def add_quota(self, context={}, clientkey=None, serial=None,
510
                  sub_quota=(), add_quota=()):
511
        rejected = []
512
        append = rejected.append
513

    
514
        if serial is not None:
515
            if clientkey is None:
516
                all_pairs = [(q[0], q[1]) for q in sub_quota + add_quota]
517
                raise ReturnButFail(all_pairs)
518
            try:
519
                cs = CallSerial.objects.get(serial=serial, clientkey=clientkey)
520
                all_pairs = [(q[0], q[1]) for q in sub_quota + add_quota]
521
                raise ReturnButFail(all_pairs)
522
            except CallSerial.DoesNotExist:
523
                pass
524

    
525
        for removing, source in [(True, sub_quota), (False, add_quota)]:
526
            for (   entity, resource, key,
527
                    quantity, capacity,
528
                    import_limit, export_limit ) in source:
529

    
530
                    try:
531
                        e = Entity.objects.get(entity=entity, key=key)
532
                    except Entity.DoesNotExist:
533
                        append((entity, resource))
534
                        continue
535

    
536
                    try:
537
                        h = db_get_holding(entity=entity, resource=resource,
538
                                           for_update=True)
539
                        p = h.policy
540
                    except Holding.DoesNotExist:
541
                        if removing:
542
                            append((entity, resource))
543
                            continue
544
                        h = Holding(entity=e, resource=resource, flags=0)
545
                        p = None
546

    
547
                    policy = newname('policy_')
548
                    newp = Policy(policy=policy)
549

    
550
                    newp.quantity = _add(p.quantity if p else 0, quantity,
551
                                         invert=removing)
552
                    newp.capacity = _add(p.capacity if p else 0, capacity,
553
                                         invert=removing)
554
                    newp.import_limit = _add(p.import_limit if p else 0,
555
                                                  import_limit, invert=removing)
556
                    newp.export_limit = _add(p.export_limit if p else 0,
557
                                                  export_limit, invert=removing)
558

    
559
                    new_values = [newp.capacity,
560
                                  newp.import_limit, newp.export_limit]
561
                    if any(map(_isneg, new_values)):
562
                        append((entity, resource))
563
                        continue
564

    
565
                    h.policy = newp
566

    
567
                    # the order is intentionally reversed so that it
568
                    # would break if we are not within a transaction.
569
                    # Has helped before.
570
                    h.save()
571
                    newp.save()
572

    
573
                    if p is not None and p.holding_set.count() == 0:
574
                        p.delete()
575

    
576
        if rejected:
577
            raise ReturnButFail(rejected)
578

    
579
        if serial is not None and clientkey is not None:
580
            CallSerial.objects.create(serial=serial, clientkey=clientkey)
581
        return rejected
582

    
583
    def ack_serials(self, context={}, clientkey=None, serials=()):
584
        if clientkey is None:
585
            return
586

    
587
        for serial in serials:
588
            try:
589
                c = db_get_callserial(clientkey=clientkey,
590
                                      serial=serial,
591
                                      for_update=True)
592
                c.delete()
593
            except CallSerial.DoesNotExist:
594
                pass
595
        return
596

    
597
    def query_serials(self, context={}, clientkey=None, serials=()):
598
        result = []
599
        append = result.append
600

    
601
        if clientkey is None:
602
            return result
603

    
604
        if not serials:
605
            cs = CallSerial.objects.filter(clientkey=clientkey)
606
            return [c.serial for c in cs]
607

    
608
        for serial in serials:
609
            try:
610
                db_get_callserial(clientkey=clientkey, serial=serial)
611
                append(serial)
612
            except CallSerial.DoesNotExist:
613
                pass
614

    
615
        return result
616

    
617
    def issue_commission(self,  context     =   {},
618
                                clientkey   =   None,
619
                                target      =   None,
620
                                key         =   None,
621
                                name        =   None,
622
                                provisions  =   ()  ):
623

    
624
        try:
625
            t = Entity.objects.get(entity=target)
626
        except Entity.DoesNotExist:
627
            m = "No target entity '%s'" % (target,)
628
            raise NoEntityError(m)
629
        else:
630
            if t.key != key:
631
                m = "Invalid key for target entity '%s'" % (target,)
632
                raise InvalidKeyError(m)
633

    
634
        create = Commission.objects.create
635
        commission = create(entity_id=target, clientkey=clientkey, name=name)
636
        serial = commission.serial
637

    
638
        checked = []
639
        for entity, resource, quantity in provisions:
640

    
641
            if entity == target:
642
                m = "Cannot issue commission from an entity to itself (%s)" % (
643
                    entity,)
644
                raise InvalidDataError(m)
645

    
646
            ent_res = entity, resource
647
            if ent_res in checked:
648
                m = "Duplicate provision for %s.%s" % ent_res
649
                raise DuplicateError(m)
650
            checked.append(ent_res)
651

    
652
            try:
653
                e = Entity.objects.get(entity=entity)
654
            except Entity.DoesNotExist:
655
                m = "No source entity '%s'" % (entity,)
656
                raise NoEntityError(m)
657

    
658
            release = 0
659
            if quantity < 0:
660
                release = 1
661

    
662
            try:
663
                h = db_get_holding(entity=entity, resource=resource,
664
                                   for_update=True)
665
            except Holding.DoesNotExist:
666
                m = ("There is not enough quantity "
667
                     "to allocate from in %s.%s" % (entity, resource))
668
                raise NoQuantityError(m)
669

    
670
            hp = h.policy
671

    
672
            if (hp.export_limit is not None and
673
                h.exporting + quantity > hp.export_limit):
674
                    m = ("Export limit reached for %s.%s" % (entity, resource))
675
                    raise ExportLimitError(m)
676

    
677
            if hp.quantity is not None:
678
                available = (+ hp.quantity + h.imported + h.returned
679
                             - h.exporting - h.releasing)
680

    
681
                if available - quantity < 0:
682
                    m = ("There is not enough quantity "
683
                         "to allocate from in %s.%s" % (entity, resource))
684
                    raise NoQuantityError(m)
685

    
686
            try:
687
                th = db_get_holding(entity=target, resource=resource,
688
                                    for_update=True)
689
            except Holding.DoesNotExist:
690
                m = ("There is not enough capacity "
691
                     "to allocate into in %s.%s" % (target, resource))
692
                raise NoCapacityError(m)
693

    
694
            tp = th.policy
695

    
696
            if (tp.import_limit is not None and
697
                th.importing + quantity > tp.import_limit):
698
                    m = ("Import limit reached for %s.%s" % (target, resource))
699
                    raise ImportLimitError(m)
700

    
701
            if tp.capacity is not None:
702
                capacity = (+ tp.capacity + th.exported + th.released
703
                            - th.importing - th.returning)
704

    
705
                if capacity - quantity < 0:
706
                        m = ("There is not enough capacity "
707
                             "to allocate into in %s.%s" % (target, resource))
708
                        raise NoCapacityError(m)
709

    
710
            Provision.objects.create(   serial      =   commission,
711
                                        entity      =   e,
712
                                        resource    =   resource,
713
                                        quantity    =   quantity   )
714
            if release:
715
                h.returning -= quantity
716
                th.releasing -= quantity
717
            else:
718
                h.exporting += quantity
719
                th.importing += quantity
720

    
721
            h.save()
722
            th.save()
723

    
724
        return serial
725

    
726
    def _log_provision(self, commission, s_holding, t_holding,
727
                             provision, log_time, reason):
728

    
729
        s_entity = s_holding.entity
730
        s_policy = s_holding.policy
731
        t_entity = t_holding.entity
732
        t_policy = t_holding.policy
733

    
734
        ProvisionLog.objects.create(
735
                        serial              =   commission.serial,
736
                        name                =   commission.name,
737
                        source              =   s_entity.entity,
738
                        target              =   t_entity.entity,
739
                        resource            =   provision.resource,
740
                        source_quantity     =   s_policy.quantity,
741
                        source_capacity     =   s_policy.capacity,
742
                        source_import_limit =   s_policy.import_limit,
743
                        source_export_limit =   s_policy.export_limit,
744
                        source_imported     =   s_holding.imported,
745
                        source_exported     =   s_holding.exported,
746
                        source_returned     =   s_holding.returned,
747
                        source_released     =   s_holding.released,
748
                        target_quantity     =   t_policy.quantity,
749
                        target_capacity     =   t_policy.capacity,
750
                        target_import_limit =   t_policy.import_limit,
751
                        target_export_limit =   t_policy.export_limit,
752
                        target_imported     =   t_holding.imported,
753
                        target_exported     =   t_holding.exported,
754
                        target_returned     =   t_holding.returned,
755
                        target_released     =   t_holding.released,
756
                        delta_quantity      =   provision.quantity,
757
                        issue_time          =   commission.issue_time,
758
                        log_time            =   log_time,
759
                        reason              =   reason)
760

    
761
    def accept_commission(self, context={}, clientkey=None,
762
                                serials=(), reason=''):
763
        log_time = now()
764

    
765
        for serial in serials:
766
            try:
767
                c = db_get_commission(clientkey=clientkey, serial=serial,
768
                                      for_update=True)
769
            except Commission.DoesNotExist:
770
                return
771

    
772
            t = c.entity
773

    
774
            provisions = db_filter_provision(serial=serial, for_update=True)
775
            for pv in provisions:
776
                try:
777
                    h = db_get_holding(entity=pv.entity.entity,
778
                                       resource=pv.resource, for_update=True)
779
                    th = db_get_holding(entity=t, resource=pv.resource,
780
                                        for_update=True)
781
                except Holding.DoesNotExist:
782
                    m = "Corrupted provision"
783
                    raise CorruptedError(m)
784

    
785
                quantity = pv.quantity
786
                release = 0
787
                if quantity < 0:
788
                    release = 1
789

    
790
                if release:
791
                    h.returned -= quantity
792
                    th.released -= quantity
793
                else:
794
                    h.exported += quantity
795
                    th.imported += quantity
796

    
797
                reason = 'ACCEPT:' + reason[-121:]
798
                self._log_provision(c, h, th, pv, log_time, reason)
799
                h.save()
800
                th.save()
801
                pv.delete()
802
            c.delete()
803

    
804
        return
805

    
806
    def reject_commission(self, context={}, clientkey=None,
807
                                serials=(), reason=''):
808
        log_time = now()
809

    
810
        for serial in serials:
811
            try:
812
                c = db_get_commission(clientkey=clientkey, serial=serial,
813
                                      for_update=True)
814
            except Commission.DoesNotExist:
815
                return
816

    
817
            t = c.entity
818

    
819
            provisions = db_filter_provision(serial=serial, for_update=True)
820
            for pv in provisions:
821
                try:
822
                    h = db_get_holding(entity=pv.entity.entity,
823
                                       resource=pv.resource, for_update=True)
824
                    th = db_get_holding(entity=t, resource=pv.resource,
825
                                        for_update=True)
826
                except Holding.DoesNotExist:
827
                    m = "Corrupted provision"
828
                    raise CorruptedError(m)
829

    
830
                quantity = pv.quantity
831
                release = 0
832
                if quantity < 0:
833
                    release = 1
834

    
835
                if release:
836
                    h.returning += quantity
837
                    th.releasing += quantity
838
                else:
839
                    h.exporting -= quantity
840
                    th.importing -= quantity
841

    
842
                reason = 'REJECT:' + reason[-121:]
843
                self._log_provision(c, h, th, pv, log_time, reason)
844
                h.save()
845
                th.save()
846
                pv.delete()
847
            c.delete()
848

    
849
        return
850

    
851
    def get_pending_commissions(self, context={}, clientkey=None):
852
        pending = Commission.objects.filter(clientkey=clientkey)\
853
                                    .values_list('serial', flat=True)
854
        return pending
855

    
856
    def resolve_pending_commissions(self,   context={}, clientkey=None,
857
                                            max_serial=None, accept_set=()  ):
858
        accept_set = set(accept_set)
859
        pending = self.get_pending_commissions(context=context, clientkey=clientkey)
860
        pending = sorted(pending)
861

    
862
        accept = self.accept_commission
863
        reject = self.reject_commission
864

    
865
        for serial in pending:
866
            if serial > max_serial:
867
                break
868

    
869
            if serial in accept_set:
870
                accept(context=context, clientkey=clientkey, serials=[serial])
871
            else:
872
                reject(context=context, clientkey=clientkey, serials=[serial])
873

    
874
        return
875

    
876
    def release_entity(self, context={}, release_entity=()):
877
        rejected = []
878
        append = rejected.append
879
        for entity, key in release_entity:
880
            try:
881
                e = db_get_entity(entity=entity, key=key, for_update=True)
882
            except Entity.DoesNotExist:
883
                append(entity)
884
                continue
885

    
886
            if e.entities.count() != 0:
887
                append(entity)
888
                continue
889

    
890
            if e.holding_set.count() != 0:
891
                append(entity)
892
                continue
893

    
894
            e.delete()
895

    
896
        if rejected:
897
            raise ReturnButFail(rejected)
898
        return rejected
899

    
900
    def get_timeline(self, context={}, after="", before="Z", get_timeline=()):
901
        entity_set = set()
902
        e_add = entity_set.add
903
        resource_set = set()
904
        r_add = resource_set.add
905

    
906
        for entity, resource, key in get_timeline:
907
            if entity not in entity_set:
908
                try:
909
                    e = Entity.objects.get(entity=entity, key=key)
910
                    e_add(entity)
911
                except Entity.DoesNotExist:
912
                    continue
913

    
914
            r_add((entity, resource))
915

    
916
        chunk_size = 65536
917
        nr = 0
918
        timeline = []
919
        append = timeline.append
920
        filterlogs = ProvisionLog.objects.filter
921
        if entity_set:
922
            q_entity = Q(source__in = entity_set) | Q(target__in = entity_set)
923
        else:
924
            q_entity = Q()
925

    
926
        while 1:
927
            logs = filterlogs(  q_entity,
928
                                issue_time__gt      =   after,
929
                                issue_time__lte     =   before,
930
                                reason__startswith  =   'ACCEPT:'   )
931

    
932
            logs = logs.order_by('issue_time')
933
            #logs = logs.values()
934
            logs = logs[:chunk_size]
935
            nr += len(logs)
936
            if not logs:
937
                break
938
            for g in logs:
939
                if ((g.source, g.resource) not in resource_set
940
                    or (g.target, g.resource) not in resource_set):
941
                        continue
942

    
943
                o = {
944
                    'serial'                    :   g.serial,
945
                    'source'                    :   g.source,
946
                    'target'                    :   g.target,
947
                    'resource'                  :   g.resource,
948
                    'name'                      :   g.name,
949
                    'quantity'                  :   g.delta_quantity,
950
                    'source_allocated'          :   g.source_allocated(),
951
                    'source_allocated_through'  :   g.source_allocated_through(),
952
                    'source_inbound'            :   g.source_inbound(),
953
                    'source_inbound_through'    :   g.source_inbound_through(),
954
                    'source_outbound'           :   g.source_outbound(),
955
                    'source_outbound_through'   :   g.source_outbound_through(),
956
                    'target_allocated'          :   g.target_allocated(),
957
                    'target_allocated_through'  :   g.target_allocated_through(),
958
                    'target_inbound'            :   g.target_inbound(),
959
                    'target_inbound_through'    :   g.target_inbound_through(),
960
                    'target_outbound'           :   g.target_outbound(),
961
                    'target_outbound_through'   :   g.target_outbound_through(),
962
                    'issue_time'                :   g.issue_time,
963
                    'log_time'                  :   g.log_time,
964
                    'reason'                    :   g.reason,
965
                }
966

    
967
                append(o)
968

    
969
            after = g.issue_time
970
            if after >= before:
971
                break
972

    
973
        return timeline
974

    
975
def _add(x, y, invert=False):
976
    if invert and y is None:
977
        return 0
978
    if x is None or y is None:
979
        return None
980
    return x + y if not invert else x - y
981

    
982
def _update(dest, source, attr, delta):
983
    dest_attr = getattr(dest, attr)
984
    dest_attr = _add(getattr(source, attr, 0), delta)
985

    
986
def _isneg(x):
987
    if x is None:
988
        return False
989
    return x < 0
990

    
991
API_Callpoint = QuotaholderDjangoDBCallpoint
992