Statistics
| Branch: | Tag: | Revision:

root / snf-quotaholder-app / quotaholder_django / quotaholder_app / callpoint.py @ d551c841

History | View | Annotate | Download (37.4 kB)

1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from synnefo.lib.quotaholder.api import (
35
                            QuotaholderAPI,
36
                            QH_PRACTICALLY_INFINITE,
37
                            InvalidKeyError, NoEntityError,
38
                            NoQuantityError, NoCapacityError,
39
                            ExportLimitError, ImportLimitError,
40
                            DuplicateError)
41

    
42
from synnefo.lib.commissioning import \
43
    Callpoint, CorruptedError, InvalidDataError, ReturnButFail
44
from synnefo.lib.commissioning.utils.newname import newname
45

    
46
from django.db.models import Q
47
from django.db import transaction, IntegrityError
48
from .models import (Holder, Entity, Policy, Holding,
49
                     Commission, Provision, ProvisionLog, CallSerial,
50
                     now,
51
                     db_get_entity, db_get_holding, db_get_policy,
52
                     db_get_commission, db_filter_provision, db_get_callserial)
53

    
54
class QuotaholderDjangoDBCallpoint(Callpoint):
55

    
56
    api_spec = QuotaholderAPI()
57

    
58
    http_exc_lookup = {
59
        CorruptedError:   550,
60
        InvalidDataError: 400,
61
        InvalidKeyError:  401,
62
        NoEntityError:    404,
63
        NoQuantityError:  413,
64
        NoCapacityError:  413,
65
    }
66

    
67
    def init_connection(self, connection):
68
        if connection is not None:
69
            raise ValueError("Cannot specify connection args with %s" %
70
                             type(self).__name__)
71
        pass
72

    
73
    def commit(self):
74
        transaction.commit()
75

    
76
    def rollback(self):
77
        transaction.rollback()
78

    
79
    def do_make_call(self, call_name, data):
80
        call_fn = getattr(self, call_name, None)
81
        if not call_fn:
82
            m = "cannot find call '%s'" % (call_name,)
83
            raise CorruptedError(m)
84

    
85
        return call_fn(**data)
86

    
87
    def create_entity(self, context={}, create_entity=()):
88
        rejected = []
89
        append = rejected.append
90

    
91
        for idx, (entity, owner, key, ownerkey) in enumerate(create_entity):
92
            try:
93
                owner = Entity.objects.get(entity=owner, key=ownerkey)
94
            except Entity.DoesNotExist:
95
                append(idx)
96
                continue
97

    
98
            try:
99
                e = Entity.objects.get(entity=entity)
100
                append(idx)
101
            except Entity.DoesNotExist:
102
                e = Entity.objects.create(entity=entity,
103
                                          owner=owner,
104
                                          key=key)
105

    
106
        if rejected:
107
            raise ReturnButFail(rejected)
108
        return rejected
109

    
110
    def set_entity_key(self, context={}, set_entity_key=()):
111
        rejected = []
112
        append = rejected.append
113

    
114
        for entity, key, newkey in set_entity_key:
115
            try:
116
                e = db_get_entity(entity=entity, key=key, for_update=True)
117
            except Entity.DoesNotExist:
118
                append(entity)
119
                continue
120

    
121
            e.key = newkey
122
            e.save()
123

    
124
        if rejected:
125
            raise ReturnButFail(rejected)
126
        return rejected
127

    
128
    def list_entities(self, context={}, entity=None, key=None):
129
        try:
130
            e = Entity.objects.get(entity=entity, key=key)
131
        except Entity.DoesNotExist:
132
            m = "Entity '%s' does not exist" % (entity,)
133
            raise NoEntityError(m)
134

    
135
        children = e.entities.all()
136
        entities = [e.entity for e in children]
137
        return entities
138

    
139
    def get_entity(self, context={}, get_entity=()):
140
        entities = []
141
        append = entities.append
142

    
143
        for entity, key in get_entity:
144
            try:
145
                e = Entity.objects.get(entity=entity, key=key)
146
            except Entity.DoesNotExist:
147
                continue
148

    
149
            append((entity, e.owner.entity))
150

    
151
        return entities
152

    
153
    def get_limits(self, context={}, get_limits=()):
154
        limits = []
155
        append = limits.append
156

    
157
        for policy in get_limits:
158
            try:
159
                p = Policy.objects.get(policy=policy)
160
            except Policy.DoesNotExist:
161
                continue
162

    
163
            append((policy, p.quantity, p.capacity,
164
                    p.import_limit, p.export_limit))
165

    
166
        return limits
167

    
168
    def set_limits(self, context={}, set_limits=()):
169

    
170
        for (   policy, quantity, capacity,
171
                import_limit, export_limit  ) in set_limits:
172

    
173
                try:
174
                    policy = db_get_policy(policy=policy, for_update=True)
175
                except Policy.DoesNotExist:
176
                    Policy.objects.create(  policy=policy,
177
                                            quantity=quantity,
178
                                            capacity=capacity,
179
                                            import_limit=import_limit,
180
                                            export_limit=export_limit   )
181
                else:
182
                    policy.quantity = quantity
183
                    policy.capacity = capacity
184
                    policy.export_limit = export_limit
185
                    policy.import_limit = import_limit
186
                    policy.save()
187

    
188
        return ()
189

    
190
    def get_holding(self, context={}, get_holding=()):
191
        holdings = []
192
        append = holdings.append
193

    
194
        for entity, resource, key in get_holding:
195
            try:
196
                h = Holding.objects.get(entity=entity, resource=resource)
197
            except Holding.DoesNotExist:
198
                continue
199

    
200
            if h.entity.key != key:
201
                continue
202

    
203
            append((h.entity.entity, h.resource, h.policy.policy,
204
                    h.imported, h.exported,
205
                    h.returned, h.released, h.flags))
206

    
207
        return holdings
208

    
209
    def _set_holding(self, entity, resource, policy, flags):
210
        try:
211
            h = db_get_holding(entity=entity, resource=resource,
212
                               for_update=True)
213
            h.policy = p
214
            h.flags = flags
215
            h.save()
216
        except Holding.DoesNotExist:
217
            h = Holding.objects.create( entity=e, resource=resource,
218
                                        policy=p, flags=flags      )
219
        return h
220

    
221
    def set_holding(self, context={}, set_holding=()):
222
        rejected = []
223
        append = rejected.append
224

    
225
        for entity, resource, key, policy, flags in set_holding:
226
            try:
227
                e = Entity.objects.get(entity=entity, key=key)
228
            except Entity.DoesNotExist:
229
                append((entity, resource, policy))
230
                continue
231

    
232
            if e.key != key:
233
                append((entity, resource, policy))
234
                continue
235

    
236
            try:
237
                p = Policy.objects.get(policy=policy)
238
            except Policy.DoesNotExist:
239
                append((entity, resource, policy))
240
                continue
241

    
242
            try:
243
                h = db_get_holding(entity=entity, resource=resource,
244
                                   for_update=True)
245
                h.policy = p
246
                h.flags = flags
247
                h.save()
248
            except Holding.DoesNotExist:
249
                h = Holding.objects.create( entity=e, resource=resource,
250
                                            policy=p, flags=flags      )
251

    
252
        if rejected:
253
            raise ReturnButFail(rejected)
254
        return rejected
255

    
256
    def _init_holding(self, entity, resource, policy,
257
                          imported, exported, returned, released,
258
                          flags):
259
        try:
260
            h = db_get_holding(entity=entity, resource=resource,
261
                               for_update=True)
262
        except Holding.DoesNotExist:
263
            h = Holding(entity=entity, resource=resource)
264

    
265
        h.policy = policy
266
        h.flags = flags
267
        h.imported=imported
268
        h.importing=imported
269
        h.exported=exported
270
        h.exporting=exported
271
        h.returned=returned
272
        h.returning=returned
273
        h.released=released
274
        h.releasing=released
275
        h.save()
276

    
277
    def init_holding(self, context={}, init_holding=()):
278
        rejected = []
279
        append = rejected.append
280

    
281
        for idx, sfh in enumerate(init_holding):
282
            (entity, resource, key, policy,
283
             imported, exported, returned, released,
284
             flags) = sfh
285
            try:
286
                e = Entity.objects.get(entity=entity, key=key)
287
            except Entity.DoesNotExist:
288
                append(idx)
289
                continue
290

    
291
            if e.key != key:
292
                append(idx)
293
                continue
294

    
295
            try:
296
                p = Policy.objects.get(policy=policy)
297
            except Policy.DoesNotExist:
298
                append(idx)
299
                continue
300

    
301
            self._init_holding(e, resource, p,
302
                                   imported, exported,
303
                                   returned, released,
304
                                   flags)
305
        if rejected:
306
            raise ReturnButFail(rejected)
307
        return rejected
308

    
309
    def reset_holding(self, context={}, reset_holding=()):
310
        rejected = []
311
        append = rejected.append
312

    
313
        for idx, tpl in enumerate(reset_holding):
314
            (entity, resource, key,
315
             imported, exported, returned, released) = tpl
316
            try:
317
                e = Entity.objects.get(entity=entity, key=key)
318
            except Entity.DoesNotExist:
319
                append(idx)
320
                continue
321

    
322
            try:
323
                h = db_get_holding(entity=entity, resource=resource,
324
                                   for_update=True)
325
                h.imported=imported
326
                h.importing=imported
327
                h.exported=exported
328
                h.exporting=exported
329
                h.returned=returned
330
                h.returning=returned
331
                h.released=released
332
                h.releasing=released
333
                h.save()
334
            except Holding.DoesNotExist:
335
                append(idx)
336
                continue
337

    
338
        if rejected:
339
            raise ReturnButFail(rejected)
340
        return rejected
341

    
342
    def _check_pending(self, entity, resource):
343
        cs = Commission.objects.filter(entity=entity)
344
        cs = [c for c in cs if c.provisions.filter(resource=resource)]
345
        as_target = [c.serial for c in cs]
346

    
347
        ps = Provision.objects.filter(entity=entity, resource=resource)
348
        as_source = [p.serial.serial for p in ps]
349

    
350
        return as_target + as_source
351

    
352
    def _actual_quantity(self, holding):
353
        hp = holding.policy
354
        return hp.quantity + (holding.imported + holding.returned -
355
                              holding.exported - holding.released)
356

    
357
    def _new_policy_name(self):
358
        return newname('policy_')
359

    
360
    def _increase_resource(self, entity, resource, amount):
361
        try:
362
            h = db_get_holding(entity=entity, resource=resource,
363
                               for_update=True)
364
        except Holding.DoesNotExist:
365
            h = Holding(entity=entity, resource=resource)
366
            p = Policy.objects.create(policy=self._new_policy_name(),
367
                                      quantity=0,
368
                                      capacity=QH_PRACTICALLY_INFINITE,
369
                                      import_limit=QH_PRACTICALLY_INFINITE,
370
                                      export_limit=QH_PRACTICALLY_INFINITE)
371
            h.policy = p
372
        h.imported += amount
373
        h.save()
374

    
375
    def release_holding(self, context={}, release_holding=()):
376
        rejected = []
377
        append = rejected.append
378

    
379
        for idx, (entity, resource, key) in enumerate(release_holding):
380
            try:
381
                h = db_get_holding(entity=entity, resource=resource,
382
                                   for_update=True)
383
            except Holding.DoesNotExist:
384
                append(idx)
385
                continue
386

    
387
            if h.entity.key != key:
388
                append(idx)
389
                continue
390

    
391
            if self._check_pending(entity, resource):
392
                append(idx)
393
                continue
394

    
395
            q = self._actual_quantity(h)
396
            if q > 0:
397
                owner = h.entity.owner
398
                self._increase_resource(owner, resource, q)
399

    
400
            h.delete()
401

    
402
        if rejected:
403
            raise ReturnButFail(rejected)
404
        return rejected
405

    
406
    def list_resources(self, context={}, entity=None, key=None):
407
        try:
408
            e = Entity.objects.get(entity=entity)
409
        except Entity.DoesNotExist:
410
            m = "No such entity '%s'" % (entity,)
411
            raise NoEntityError(m)
412

    
413
        if e.key != key:
414
            m = "Invalid key for entity '%s'" % (entity,)
415
            raise InvalidKeyError(m)
416

    
417
        holdings = e.holding_set.filter(entity=entity)
418
        resources = [h.resource for h in holdings]
419
        return resources
420

    
421
    def list_holdings(self, context={}, list_holdings=()):
422
        rejected = []
423
        reject = rejected.append
424
        holdings_list = []
425
        append = holdings_list.append
426

    
427
        for entity, key in list_holdings:
428
            try:
429
                e = Entity.objects.get(entity=entity)
430
                if e.key != key:
431
                    raise Entity.DoesNotExist("wrong key")
432
            except Entity.DoesNotExist:
433
                reject(entity)
434
                continue
435

    
436
            holdings = e.holding_set.filter(entity=entity)
437
            append([[entity, h.resource,
438
                     h.imported, h.exported, h.returned, h.released]
439
                        for h in holdings])
440

    
441
        return holdings_list, rejected
442

    
443
    def get_quota(self, context={}, get_quota=()):
444
        quotas = []
445
        append = quotas.append
446

    
447
        for entity, resource, key in get_quota:
448
            try:
449
                h = Holding.objects.get(entity=entity, resource=resource)
450
            except Holding.DoesNotExist:
451
                continue
452

    
453
            if h.entity.key != key:
454
                continue
455

    
456
            p = h.policy
457

    
458
            append((h.entity.entity, h.resource, p.quantity, p.capacity,
459
                    p.import_limit, p.export_limit,
460
                    h.imported, h.exported,
461
                    h.returned, h.released,
462
                    h.flags))
463

    
464
        return quotas
465

    
466
    def set_quota(self, context={}, set_quota=()):
467
        rejected = []
468
        append = rejected.append
469

    
470
        for (   entity, resource, key,
471
                quantity, capacity,
472
                import_limit, export_limit, flags  ) in set_quota:
473

    
474
                try:
475
                    e = Entity.objects.get(entity=entity, key=key)
476
                except Entity.DoesNotExist:
477
                    append((entity, resource))
478
                    continue
479

    
480
                policy = newname('policy_')
481
                newp = Policy   (
482
                            policy=policy,
483
                            quantity=quantity,
484
                            capacity=capacity,
485
                            import_limit=import_limit,
486
                            export_limit=export_limit
487
                )
488

    
489
                try:
490
                    h = db_get_holding(entity=entity, resource=resource,
491
                                       for_update=True)
492
                    p = h.policy
493
                    h.policy = newp
494
                    h.flags = flags
495
                except Holding.DoesNotExist:
496
                    h = Holding(entity=e, resource=resource,
497
                                policy=newp, flags=flags)
498
                    p = None
499

    
500
                # the order is intentionally reversed so that it
501
                # would break if we are not within a transaction.
502
                # Has helped before.
503
                h.save()
504
                newp.save()
505

    
506
                if p is not None and p.holding_set.count() == 0:
507
                    p.delete()
508

    
509
        if rejected:
510
            raise ReturnButFail(rejected)
511
        return rejected
512

    
513
    def add_quota(self, context={}, clientkey=None, serial=None,
514
                  sub_quota=(), add_quota=()):
515
        rejected = []
516
        append = rejected.append
517

    
518
        if serial is not None:
519
            if clientkey is None:
520
                all_pairs = [(q[0], q[1]) for q in sub_quota + add_quota]
521
                raise ReturnButFail(all_pairs)
522
            try:
523
                cs = CallSerial.objects.get(serial=serial, clientkey=clientkey)
524
                all_pairs = [(q[0], q[1]) for q in sub_quota + add_quota]
525
                raise ReturnButFail(all_pairs)
526
            except CallSerial.DoesNotExist:
527
                pass
528

    
529
        for removing, source in [(True, sub_quota), (False, add_quota)]:
530
            for (   entity, resource, key,
531
                    quantity, capacity,
532
                    import_limit, export_limit ) in source:
533

    
534
                    try:
535
                        e = Entity.objects.get(entity=entity, key=key)
536
                    except Entity.DoesNotExist:
537
                        append((entity, resource))
538
                        continue
539

    
540
                    try:
541
                        h = db_get_holding(entity=entity, resource=resource,
542
                                           for_update=True)
543
                        p = h.policy
544
                    except Holding.DoesNotExist:
545
                        if removing:
546
                            append((entity, resource))
547
                            continue
548
                        h = Holding(entity=e, resource=resource, flags=0)
549
                        p = None
550

    
551
                    policy = newname('policy_')
552
                    newp = Policy(policy=policy)
553

    
554
                    newp.quantity = _add(p.quantity if p else 0, quantity,
555
                                         invert=removing)
556
                    newp.capacity = _add(p.capacity if p else 0, capacity,
557
                                         invert=removing)
558
                    newp.import_limit = _add(p.import_limit if p else 0,
559
                                                  import_limit, invert=removing)
560
                    newp.export_limit = _add(p.export_limit if p else 0,
561
                                                  export_limit, invert=removing)
562

    
563
                    new_values = [newp.capacity,
564
                                  newp.import_limit, newp.export_limit]
565
                    if any(map(_isneg, new_values)):
566
                        append((entity, resource))
567
                        continue
568

    
569
                    h.policy = newp
570

    
571
                    # the order is intentionally reversed so that it
572
                    # would break if we are not within a transaction.
573
                    # Has helped before.
574
                    h.save()
575
                    newp.save()
576

    
577
                    if p is not None and p.holding_set.count() == 0:
578
                        p.delete()
579

    
580
        if rejected:
581
            raise ReturnButFail(rejected)
582

    
583
        if serial is not None and clientkey is not None:
584
            CallSerial.objects.create(serial=serial, clientkey=clientkey)
585
        return rejected
586

    
587
    def ack_serials(self, context={}, clientkey=None, serials=()):
588
        if clientkey is None:
589
            return
590

    
591
        for serial in serials:
592
            try:
593
                c = db_get_callserial(clientkey=clientkey,
594
                                      serial=serial,
595
                                      for_update=True)
596
                c.delete()
597
            except CallSerial.DoesNotExist:
598
                pass
599
        return
600

    
601
    def query_serials(self, context={}, clientkey=None, serials=()):
602
        result = []
603
        append = result.append
604

    
605
        if clientkey is None:
606
            return result
607

    
608
        if not serials:
609
            cs = CallSerial.objects.filter(clientkey=clientkey)
610
            return [c.serial for c in cs]
611

    
612
        for serial in serials:
613
            try:
614
                db_get_callserial(clientkey=clientkey, serial=serial)
615
                append(serial)
616
            except CallSerial.DoesNotExist:
617
                pass
618

    
619
        return result
620

    
621
    def issue_commission(self,  context     =   {},
622
                                clientkey   =   None,
623
                                target      =   None,
624
                                key         =   None,
625
                                name        =   None,
626
                                provisions  =   ()  ):
627

    
628
        try:
629
            t = Entity.objects.get(entity=target)
630
        except Entity.DoesNotExist:
631
            m = "No target entity '%s'" % (target,)
632
            raise NoEntityError(m)
633
        else:
634
            if t.key != key:
635
                m = "Invalid key for target entity '%s'" % (target,)
636
                raise InvalidKeyError(m)
637

    
638
        create = Commission.objects.create
639
        commission = create(entity_id=target, clientkey=clientkey, name=name)
640
        serial = commission.serial
641

    
642
        checked = []
643
        for entity, resource, quantity in provisions:
644

    
645
            if entity == target:
646
                m = "Cannot issue commission from an entity to itself (%s)" % (
647
                    entity,)
648
                raise InvalidDataError(m)
649

    
650
            ent_res = entity, resource
651
            if ent_res in checked:
652
                m = "Duplicate provision for %s.%s" % ent_res
653
                raise DuplicateError(m)
654
            checked.append(ent_res)
655

    
656
            try:
657
                e = Entity.objects.get(entity=entity)
658
            except Entity.DoesNotExist:
659
                m = "No source entity '%s'" % (entity,)
660
                raise NoEntityError(m)
661

    
662
            release = 0
663
            if quantity < 0:
664
                release = 1
665

    
666
            # Source limits checks
667
            try:
668
                h = db_get_holding(entity=entity, resource=resource,
669
                                   for_update=True)
670
            except Holding.DoesNotExist:
671
                m = ("There is no quantity "
672
                     "to allocate from in %s.%s" % (entity, resource))
673
                raise NoQuantityError(m,
674
                                      source=entity, target=target,
675
                                      resource=resource, requested=quantity,
676
                                      current=0, limit=0)
677

    
678
            hp = h.policy
679

    
680
            if not release:
681
                current = h.exporting
682
                limit = hp.export_limit
683
                if current + quantity > limit:
684
                    m = ("Export limit reached for %s.%s" % (entity, resource))
685
                    raise ExportLimitError(m,
686
                                           source=entity, target=target,
687
                                           resource=resource, requested=quantity,
688
                                           current=current, limit=limit)
689

    
690
                limit = hp.quantity + h.imported - h.releasing
691
                unavailable = h.exporting - h.returned
692
                available = limit - unavailable
693

    
694
                if quantity > available:
695
                    m = ("There is not enough quantity "
696
                         "to allocate from in %s.%s" % (entity, resource))
697
                    raise NoQuantityError(m,
698
                                          source=entity, target=target,
699
                                          resource=resource, requested=quantity,
700
                                          current=unavailable, limit=limit)
701
            else:
702
                current = (+ h.importing + h.returning
703
                           - h.exported  - h.returned)
704
                limit = hp.capacity
705
                if current - quantity > limit:
706
                    m = ("There is not enough capacity "
707
                         "to release to in %s.%s" % (entity, resource))
708
                    raise NoQuantityError(m,
709
                                          source=entity, target=target,
710
                                          resource=resource, requested=quantity,
711
                                          current=current, limit=limit)
712

    
713
            # Target limits checks
714
            try:
715
                th = db_get_holding(entity=target, resource=resource,
716
                                    for_update=True)
717
            except Holding.DoesNotExist:
718
                m = ("There is no capacity "
719
                     "to allocate into in %s.%s" % (target, resource))
720
                raise NoCapacityError(m,
721
                                      source=entity, target=target,
722
                                      resource=resource, requested=quantity,
723
                                      current=0, limit=0)
724

    
725
            tp = th.policy
726

    
727
            if not release:
728
                limit = tp.import_limit
729
                current = th.importing
730
                if current + quantity > limit:
731
                    m = ("Import limit reached for %s.%s" % (target, resource))
732
                    raise ImportLimitError(m,
733
                                           source=entity, target=target,
734
                                           resource=resource, requested=quantity,
735
                                           current=current, limit=limit)
736

    
737
                current = (+ th.importing + th.returning
738
                           - th.exported - th.released)
739

    
740
                if current + quantity > tp.quantity + tp.capacity:
741
                    m = ("There is not enough capacity "
742
                         "to allocate into in %s.%s" % (target, resource))
743
                    raise NoCapacityError(m,
744
                                          source=entity, target=target,
745
                                          resource=resource, requested=quantity,
746
                                          current=current, limit=limit)
747
            else:
748
                limit = tp.quantity + th.imported - th.releasing
749
                unavailable = th.exporting - th.returned
750
                available = limit - unavailable
751

    
752
                if available + quantity < 0:
753
                    m = ("There is not enough quantity "
754
                         "to release from in %s.%s" % (target, resource))
755
                    raise NoCapacityError(m,
756
                                          source=entity, target=target,
757
                                          resource=resource, requested=quantity,
758
                                          current=unavailable, limit=limit)
759

    
760
            Provision.objects.create(   serial      =   commission,
761
                                        entity      =   e,
762
                                        resource    =   resource,
763
                                        quantity    =   quantity   )
764
            if release:
765
                h.returning -= quantity
766
                th.releasing -= quantity
767
            else:
768
                h.exporting += quantity
769
                th.importing += quantity
770

    
771
            h.save()
772
            th.save()
773

    
774
        return serial
775

    
776
    def _log_provision(self, commission, s_holding, t_holding,
777
                             provision, log_time, reason):
778

    
779
        s_entity = s_holding.entity
780
        s_policy = s_holding.policy
781
        t_entity = t_holding.entity
782
        t_policy = t_holding.policy
783

    
784
        ProvisionLog.objects.create(
785
                        serial              =   commission.serial,
786
                        name                =   commission.name,
787
                        source              =   s_entity.entity,
788
                        target              =   t_entity.entity,
789
                        resource            =   provision.resource,
790
                        source_quantity     =   s_policy.quantity,
791
                        source_capacity     =   s_policy.capacity,
792
                        source_import_limit =   s_policy.import_limit,
793
                        source_export_limit =   s_policy.export_limit,
794
                        source_imported     =   s_holding.imported,
795
                        source_exported     =   s_holding.exported,
796
                        source_returned     =   s_holding.returned,
797
                        source_released     =   s_holding.released,
798
                        target_quantity     =   t_policy.quantity,
799
                        target_capacity     =   t_policy.capacity,
800
                        target_import_limit =   t_policy.import_limit,
801
                        target_export_limit =   t_policy.export_limit,
802
                        target_imported     =   t_holding.imported,
803
                        target_exported     =   t_holding.exported,
804
                        target_returned     =   t_holding.returned,
805
                        target_released     =   t_holding.released,
806
                        delta_quantity      =   provision.quantity,
807
                        issue_time          =   commission.issue_time,
808
                        log_time            =   log_time,
809
                        reason              =   reason)
810

    
811
    def accept_commission(self, context={}, clientkey=None,
812
                                serials=(), reason=''):
813
        log_time = now()
814

    
815
        for serial in serials:
816
            try:
817
                c = db_get_commission(clientkey=clientkey, serial=serial,
818
                                      for_update=True)
819
            except Commission.DoesNotExist:
820
                return
821

    
822
            t = c.entity
823

    
824
            provisions = db_filter_provision(serial=serial, for_update=True)
825
            for pv in provisions:
826
                try:
827
                    h = db_get_holding(entity=pv.entity.entity,
828
                                       resource=pv.resource, for_update=True)
829
                    th = db_get_holding(entity=t, resource=pv.resource,
830
                                        for_update=True)
831
                except Holding.DoesNotExist:
832
                    m = "Corrupted provision"
833
                    raise CorruptedError(m)
834

    
835
                quantity = pv.quantity
836
                release = 0
837
                if quantity < 0:
838
                    release = 1
839

    
840
                if release:
841
                    h.returned -= quantity
842
                    th.released -= quantity
843
                else:
844
                    h.exported += quantity
845
                    th.imported += quantity
846

    
847
                reason = 'ACCEPT:' + reason[-121:]
848
                self._log_provision(c, h, th, pv, log_time, reason)
849
                h.save()
850
                th.save()
851
                pv.delete()
852
            c.delete()
853

    
854
        return
855

    
856
    def reject_commission(self, context={}, clientkey=None,
857
                                serials=(), reason=''):
858
        log_time = now()
859

    
860
        for serial in serials:
861
            try:
862
                c = db_get_commission(clientkey=clientkey, serial=serial,
863
                                      for_update=True)
864
            except Commission.DoesNotExist:
865
                return
866

    
867
            t = c.entity
868

    
869
            provisions = db_filter_provision(serial=serial, for_update=True)
870
            for pv in provisions:
871
                try:
872
                    h = db_get_holding(entity=pv.entity.entity,
873
                                       resource=pv.resource, for_update=True)
874
                    th = db_get_holding(entity=t, resource=pv.resource,
875
                                        for_update=True)
876
                except Holding.DoesNotExist:
877
                    m = "Corrupted provision"
878
                    raise CorruptedError(m)
879

    
880
                quantity = pv.quantity
881
                release = 0
882
                if quantity < 0:
883
                    release = 1
884

    
885
                if release:
886
                    h.returning += quantity
887
                    th.releasing += quantity
888
                else:
889
                    h.exporting -= quantity
890
                    th.importing -= quantity
891

    
892
                reason = 'REJECT:' + reason[-121:]
893
                self._log_provision(c, h, th, pv, log_time, reason)
894
                h.save()
895
                th.save()
896
                pv.delete()
897
            c.delete()
898

    
899
        return
900

    
901
    def get_pending_commissions(self, context={}, clientkey=None):
902
        pending = Commission.objects.filter(clientkey=clientkey)\
903
                                    .values_list('serial', flat=True)
904
        return pending
905

    
906
    def resolve_pending_commissions(self,   context={}, clientkey=None,
907
                                            max_serial=None, accept_set=()  ):
908
        accept_set = set(accept_set)
909
        pending = self.get_pending_commissions(context=context, clientkey=clientkey)
910
        pending = sorted(pending)
911

    
912
        accept = self.accept_commission
913
        reject = self.reject_commission
914

    
915
        for serial in pending:
916
            if serial > max_serial:
917
                break
918

    
919
            if serial in accept_set:
920
                accept(context=context, clientkey=clientkey, serials=[serial])
921
            else:
922
                reject(context=context, clientkey=clientkey, serials=[serial])
923

    
924
        return
925

    
926
    def release_entity(self, context={}, release_entity=()):
927
        rejected = []
928
        append = rejected.append
929
        for entity, key in release_entity:
930
            try:
931
                e = db_get_entity(entity=entity, key=key, for_update=True)
932
            except Entity.DoesNotExist:
933
                append(entity)
934
                continue
935

    
936
            if e.entities.count() != 0:
937
                append(entity)
938
                continue
939

    
940
            if e.holding_set.count() != 0:
941
                append(entity)
942
                continue
943

    
944
            e.delete()
945

    
946
        if rejected:
947
            raise ReturnButFail(rejected)
948
        return rejected
949

    
950
    def get_timeline(self, context={}, after="", before="Z", get_timeline=()):
951
        entity_set = set()
952
        e_add = entity_set.add
953
        resource_set = set()
954
        r_add = resource_set.add
955

    
956
        for entity, resource, key in get_timeline:
957
            if entity not in entity_set:
958
                try:
959
                    e = Entity.objects.get(entity=entity, key=key)
960
                    e_add(entity)
961
                except Entity.DoesNotExist:
962
                    continue
963

    
964
            r_add((entity, resource))
965

    
966
        chunk_size = 65536
967
        nr = 0
968
        timeline = []
969
        append = timeline.append
970
        filterlogs = ProvisionLog.objects.filter
971
        if entity_set:
972
            q_entity = Q(source__in = entity_set) | Q(target__in = entity_set)
973
        else:
974
            q_entity = Q()
975

    
976
        while 1:
977
            logs = filterlogs(  q_entity,
978
                                issue_time__gt      =   after,
979
                                issue_time__lte     =   before,
980
                                reason__startswith  =   'ACCEPT:'   )
981

    
982
            logs = logs.order_by('issue_time')
983
            #logs = logs.values()
984
            logs = logs[:chunk_size]
985
            nr += len(logs)
986
            if not logs:
987
                break
988
            for g in logs:
989
                if ((g.source, g.resource) not in resource_set
990
                    or (g.target, g.resource) not in resource_set):
991
                        continue
992

    
993
                o = {
994
                    'serial'                    :   g.serial,
995
                    'source'                    :   g.source,
996
                    'target'                    :   g.target,
997
                    'resource'                  :   g.resource,
998
                    'name'                      :   g.name,
999
                    'quantity'                  :   g.delta_quantity,
1000
                    'source_allocated'          :   g.source_allocated(),
1001
                    'source_allocated_through'  :   g.source_allocated_through(),
1002
                    'source_inbound'            :   g.source_inbound(),
1003
                    'source_inbound_through'    :   g.source_inbound_through(),
1004
                    'source_outbound'           :   g.source_outbound(),
1005
                    'source_outbound_through'   :   g.source_outbound_through(),
1006
                    'target_allocated'          :   g.target_allocated(),
1007
                    'target_allocated_through'  :   g.target_allocated_through(),
1008
                    'target_inbound'            :   g.target_inbound(),
1009
                    'target_inbound_through'    :   g.target_inbound_through(),
1010
                    'target_outbound'           :   g.target_outbound(),
1011
                    'target_outbound_through'   :   g.target_outbound_through(),
1012
                    'issue_time'                :   g.issue_time,
1013
                    'log_time'                  :   g.log_time,
1014
                    'reason'                    :   g.reason,
1015
                }
1016

    
1017
                append(o)
1018

    
1019
            after = g.issue_time
1020
            if after >= before:
1021
                break
1022

    
1023
        return timeline
1024

    
1025
def _add(x, y, invert=False):
1026
    return x + y if not invert else x - y
1027

    
1028
def _update(dest, source, attr, delta):
1029
    dest_attr = getattr(dest, attr)
1030
    dest_attr = _add(getattr(source, attr, 0), delta)
1031

    
1032
def _isneg(x):
1033
    return x < 0
1034

    
1035
API_Callpoint = QuotaholderDjangoDBCallpoint
1036