Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / quotaholder / callpoint.py @ 04dcc30e

History | View | Annotate | Download (20.9 kB)

1
# Copyright 2012, 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from astakos.quotaholder.exception import (
35
    QuotaholderError,
36
    CorruptedError, InvalidDataError,
37
    NoStockError, NoCapacityError,
38
    DuplicateError)
39

    
40
from astakos.quotaholder.commission import (
41
    Import, Export, Reclaim, Release, Operations)
42

    
43
from astakos.quotaholder.utils.newname import newname
44
from astakos.quotaholder.api import QH_PRACTICALLY_INFINITE
45

    
46
from django.db.models import Q, Count
47
from django.db.models import Q
48
from .models import (Holding,
49
                     Commission, Provision, ProvisionLog,
50
                     now,
51
                     db_get_holding,
52
                     db_get_commission, db_filter_provision)
53

    
54

    
55
class QuotaholderDjangoDBCallpoint(object):
56

    
57
    def _init_holding(self,
58
                      holder, resource, capacity,
59
                      imported_min, imported_max, stock_min, stock_max,
60
                      flags):
61
        try:
62
            h = db_get_holding(holder=holder, resource=resource,
63
                               for_update=True)
64
        except Holding.DoesNotExist:
65
            h = Holding(holder=holder, resource=resource)
66

    
67
        h.capacity = capacity
68
        h.flags = flags
69
        h.imported_min = imported_min
70
        h.imported_max = imported_max
71
        h.stock_min = stock_min
72
        h.stock_max = stock_max
73
        h.save()
74

    
75
    def init_holding(self, context=None, init_holding=[]):
76
        rejected = []
77
        append = rejected.append
78

    
79
        for idx, sfh in enumerate(init_holding):
80
            (holder, resource, capacity,
81
             imported_min, imported_max, stock_min, stock_max,
82
             flags) = sfh
83

    
84
            self._init_holding(holder, resource, capacity,
85
                               imported_min, imported_max,
86
                               stock_min, stock_max,
87
                               flags)
88
        if rejected:
89
            raise QuotaholderError(rejected)
90
        return rejected
91

    
92
    def reset_holding(self, context=None, reset_holding=[]):
93
        rejected = []
94
        append = rejected.append
95

    
96
        for idx, tpl in enumerate(reset_holding):
97
            (holder, resource,
98
             imported_min, imported_max, stock_min, stock_max) = tpl
99

    
100
            try:
101
                h = db_get_holding(holder=holder, resource=resource,
102
                                   for_update=True)
103
                h.imported_min = imported_min
104
                h.imported_max = imported_max
105
                h.stock_min = stock_min
106
                h.stock_max = stock_max
107
                h.save()
108
            except Holding.DoesNotExist:
109
                append(idx)
110
                continue
111

    
112
        if rejected:
113
            raise QuotaholderError(rejected)
114
        return rejected
115

    
116
    def _check_pending(self, holder, resource):
117
        cs = Commission.objects.filter(holder=holder)
118
        cs = [c for c in cs if c.provisions.filter(resource=resource)]
119
        as_target = [c.serial for c in cs]
120

    
121
        ps = Provision.objects.filter(holder=holder, resource=resource)
122
        as_source = [p.serial.serial for p in ps]
123

    
124
        return as_target + as_source
125

    
126
    def release_holding(self, context=None, release_holding=[]):
127
        rejected = []
128
        append = rejected.append
129

    
130
        for idx, (holder, resource) in enumerate(release_holding):
131
            try:
132
                h = db_get_holding(holder=holder, resource=resource,
133
                                   for_update=True)
134
            except Holding.DoesNotExist:
135
                append(idx)
136
                continue
137

    
138
            if self._check_pending(holder, resource):
139
                append(idx)
140
                continue
141

    
142
            if h.imported_max > 0:
143
                append(idx)
144
                continue
145

    
146
            h.delete()
147

    
148
        if rejected:
149
            raise QuotaholderError(rejected)
150
        return rejected
151

    
152
    def list_resources(self, context=None, holder=None):
153
        holdings = Holding.objects.filter(holder=holder)
154
        resources = [h.resource for h in holdings]
155
        return resources
156

    
157
    def list_holdings(self, context=None, list_holdings=[]):
158
        rejected = []
159
        reject = rejected.append
160
        holdings_list = []
161
        append = holdings_list.append
162

    
163
        for holder in list_holdings:
164
            holdings = list(Holding.objects.filter(holder=holder))
165
            if not holdings:
166
                reject(holder)
167
                continue
168

    
169
            append([(holder, h.resource,
170
                     h.imported_min, h.imported_max, h.stock_min, h.stock_max)
171
                    for h in holdings])
172

    
173
        return holdings_list, rejected
174

    
175
    def get_quota(self, context=None, get_quota=[]):
176
        quotas = []
177
        append = quotas.append
178

    
179
        holders = set(holder for holder, r in get_quota)
180
        hs = Holding.objects.filter(holder__in=holders)
181
        holdings = {}
182
        for h in hs:
183
            holdings[(h.holder, h.resource)] = h
184

    
185
        for holder, resource in get_quota:
186
            try:
187
                h = holdings[(holder, resource)]
188
            except:
189
                continue
190

    
191
            append((h.holder, h.resource, h.capacity,
192
                    h.imported_min, h.imported_max,
193
                    h.stock_min, h.stock_max,
194
                    h.flags))
195

    
196
        return quotas
197

    
198
    def set_quota(self, context=None, set_quota=[]):
199
        rejected = []
200
        append = rejected.append
201

    
202
        q_holdings = Q()
203
        holders = []
204
        for (holder, resource, _, _) in set_quota:
205
            holders.append(holder)
206

    
207
        hs = Holding.objects.filter(holder__in=holders).select_for_update()
208
        holdings = {}
209
        for h in hs:
210
            holdings[(h.holder, h.resource)] = h
211

    
212
        for (holder, resource,
213
             capacity,
214
             flags) in set_quota:
215

    
216
            try:
217
                h = holdings[(holder, resource)]
218
                h.flags = flags
219
            except KeyError:
220
                h = Holding(holder=holder, resource=resource,
221
                            flags=flags)
222

    
223
            h.capacity = capacity
224
            h.save()
225
            holdings[(holder, resource)] = h
226

    
227
        if rejected:
228
            raise QuotaholderError(rejected)
229
        return rejected
230

    
231
    def add_quota(self,
232
                  context=None,
233
                  sub_quota=[], add_quota=[]):
234
        rejected = []
235
        append = rejected.append
236

    
237
        sources = sub_quota + add_quota
238
        q_holdings = Q()
239
        holders = []
240
        for (holder, resource, _) in sources:
241
            holders.append(holder)
242

    
243
        hs = Holding.objects.filter(holder__in=holders).select_for_update()
244
        holdings = {}
245
        for h in hs:
246
            holdings[(h.holder, h.resource)] = h
247

    
248
        for removing, source in [(True, sub_quota), (False, add_quota)]:
249
            for (holder, resource,
250
                 capacity,
251
                 ) in source:
252

    
253
                try:
254
                    h = holdings[(holder, resource)]
255
                    current_capacity = h.capacity
256
                except KeyError:
257
                    if removing:
258
                        append((holder, resource))
259
                        continue
260

    
261
                    h = Holding(holder=holder, resource=resource, flags=0)
262
                    current_capacity = 0
263

    
264
                h.capacity = (current_capacity - capacity if removing else
265
                              current_capacity + capacity)
266

    
267
                if h.capacity < 0:
268
                    append((holder, resource))
269
                    continue
270

    
271
                h.save()
272
                holdings[(holder, resource)] = h
273

    
274
        if rejected:
275
            raise QuotaholderError(rejected)
276

    
277
        return rejected
278

    
279
    def issue_commission(self,
280
                         context=None,
281
                         clientkey=None,
282
                         target=None,
283
                         name=None,
284
                         provisions=()):
285

    
286
        create = Commission.objects.create
287
        commission = create(holder=target, clientkey=clientkey, name=name)
288
        serial = commission.serial
289

    
290
        operations = Operations()
291

    
292
        try:
293
            checked = []
294
            for holder, resource, quantity in provisions:
295

    
296
                if holder == target:
297
                    m = ("Cannot issue commission from a holder "
298
                         "to itself (%s)" % (holder,))
299
                    raise InvalidDataError(m)
300

    
301
                ent_res = holder, resource
302
                if ent_res in checked:
303
                    m = "Duplicate provision for %s.%s" % ent_res
304
                    raise DuplicateError(m)
305
                checked.append(ent_res)
306

    
307
                # Source
308
                try:
309
                    h = (db_get_holding(holder=holder, resource=resource,
310
                                        for_update=True)
311
                         if holder is not None
312
                         else None)
313
                except Holding.DoesNotExist:
314
                    m = ("%s has no stock of %s." % (holder, resource))
315
                    raise NoStockError(m,
316
                                       holder=holder,
317
                                       resource=resource,
318
                                       requested=quantity,
319
                                       current=0,
320
                                       limit=0)
321

    
322
                # Target
323
                try:
324
                    th = db_get_holding(holder=target, resource=resource,
325
                                        for_update=True)
326
                except Holding.DoesNotExist:
327
                    m = ("There is no capacity "
328
                         "to allocate into in %s.%s" % (target, resource))
329
                    raise NoCapacityError(m,
330
                                          holder=holder,
331
                                          resource=resource,
332
                                          requested=quantity,
333
                                          current=0,
334
                                          limit=0)
335

    
336
                if quantity >= 0:
337
                    if h is not None:
338
                        operations.prepare(Export, h, quantity)
339
                    operations.prepare(Import, th, quantity)
340

    
341
                else: # release
342
                    abs_quantity = -quantity
343

    
344
                    if h is not None:
345
                        operations.prepare(Reclaim, h, abs_quantity)
346
                    operations.prepare(Release, th, abs_quantity)
347

    
348
                Provision.objects.create(serial=commission,
349
                                         holder=holder,
350
                                         resource=resource,
351
                                         quantity=quantity)
352

    
353
        except QuotaholderError:
354
            operations.revert()
355
            raise
356

    
357
        return serial
358

    
359
    def _log_provision(self,
360
                       commission, s_holding, t_holding,
361
                       provision, log_time, reason):
362

    
363
        if s_holding is not None:
364
            s_holder = s_holding.holder
365
            s_capacity = s_holding.capacity
366
            s_imported_min = s_holding.imported_min
367
            s_imported_max = s_holding.imported_max
368
            s_stock_min = s_holding.stock_min
369
            s_stock_max = s_holding.stock_max
370
        else:
371
            s_holder = None
372
            s_capacity = None
373
            s_imported_min = None
374
            s_imported_max = None
375
            s_stock_min = None
376
            s_stock_max = None
377

    
378
        kwargs = {
379
            'serial':              commission.serial,
380
            'name':                commission.name,
381
            'source':              s_holder,
382
            'target':              t_holding.holder,
383
            'resource':            provision.resource,
384
            'source_capacity':     s_capacity,
385
            'source_imported_min': s_imported_min,
386
            'source_imported_max': s_imported_max,
387
            'source_stock_min':    s_stock_min,
388
            'source_stock_max':    s_stock_max,
389
            'target_capacity':     t_holding.capacity,
390
            'target_imported_min': t_holding.imported_min,
391
            'target_imported_max': t_holding.imported_max,
392
            'target_stock_min':    t_holding.stock_min,
393
            'target_stock_max':    t_holding.stock_max,
394
            'delta_quantity':      provision.quantity,
395
            'issue_time':          commission.issue_time,
396
            'log_time':            log_time,
397
            'reason':              reason,
398
        }
399

    
400
        ProvisionLog.objects.create(**kwargs)
401

    
402
    def accept_commission(self,
403
                          context=None, clientkey=None,
404
                          serials=[], reason=''):
405
        log_time = now()
406

    
407
        for serial in serials:
408
            try:
409
                c = db_get_commission(clientkey=clientkey, serial=serial,
410
                                      for_update=True)
411
            except Commission.DoesNotExist:
412
                return
413

    
414
            t = c.holder
415

    
416
            operations = Operations()
417

    
418
            provisions = db_filter_provision(serial=serial, for_update=True)
419
            for pv in provisions:
420
                try:
421
                    h = (db_get_holding(holder=pv.holder,
422
                                        resource=pv.resource, for_update=True)
423
                         if pv.holder is not None
424
                         else None)
425
                    th = db_get_holding(holder=t, resource=pv.resource,
426
                                        for_update=True)
427
                except Holding.DoesNotExist:
428
                    m = "Corrupted provision"
429
                    raise CorruptedError(m)
430

    
431
                quantity = pv.quantity
432

    
433
                if quantity >= 0:
434
                    if h is not None:
435
                        operations.finalize(Export, h, quantity)
436
                    operations.finalize(Import, th, quantity)
437
                else: # release
438
                    abs_quantity = -quantity
439

    
440
                    if h is not None:
441
                        operations.finalize(Reclaim, h, abs_quantity)
442
                    operations.finalize(Release, th, abs_quantity)
443

    
444
                reason = 'ACCEPT:' + reason[-121:]
445
                self._log_provision(c, h, th, pv, log_time, reason)
446
                pv.delete()
447
            c.delete()
448

    
449
        return
450

    
451
    def reject_commission(self,
452
                          context=None, clientkey=None,
453
                          serials=[], reason=''):
454
        log_time = now()
455

    
456
        for serial in serials:
457
            try:
458
                c = db_get_commission(clientkey=clientkey, serial=serial,
459
                                      for_update=True)
460
            except Commission.DoesNotExist:
461
                return
462

    
463
            t = c.holder
464

    
465
            operations = Operations()
466

    
467
            provisions = db_filter_provision(serial=serial, for_update=True)
468
            for pv in provisions:
469
                try:
470
                    h = (db_get_holding(holder=pv.holder,
471
                                        resource=pv.resource, for_update=True)
472
                         if pv.holder is not None
473
                         else None)
474
                    th = db_get_holding(holder=t, resource=pv.resource,
475
                                        for_update=True)
476
                except Holding.DoesNotExist:
477
                    m = "Corrupted provision"
478
                    raise CorruptedError(m)
479

    
480
                quantity = pv.quantity
481

    
482
                if quantity >= 0:
483
                    if h is not None:
484
                        operations.undo(Export, h, quantity)
485
                    operations.undo(Import, th, quantity)
486
                else: # release
487
                    abs_quantity = -quantity
488

    
489
                    if h is not None:
490
                        operations.undo(Reclaim, h, abs_quantity)
491
                    operations.undo(Release, th, abs_quantity)
492

    
493
                reason = 'REJECT:' + reason[-121:]
494
                self._log_provision(c, h, th, pv, log_time, reason)
495
                pv.delete()
496
            c.delete()
497

    
498
        return
499

    
500
    def get_pending_commissions(self, context=None, clientkey=None):
501
        pending = Commission.objects.filter(clientkey=clientkey)
502
        pending_list = pending.values_list('serial', flat=True)
503
        return pending_list
504

    
505
    def resolve_pending_commissions(self,
506
                                    context=None, clientkey=None,
507
                                    max_serial=None, accept_set=[]):
508
        accept_set = set(accept_set)
509
        pending = self.get_pending_commissions(context=context,
510
                                               clientkey=clientkey)
511
        pending = sorted(pending)
512

    
513
        accept = self.accept_commission
514
        reject = self.reject_commission
515

    
516
        for serial in pending:
517
            if serial > max_serial:
518
                break
519

    
520
            if serial in accept_set:
521
                accept(context=context, clientkey=clientkey, serials=[serial])
522
            else:
523
                reject(context=context, clientkey=clientkey, serials=[serial])
524

    
525
        return
526

    
527
    def get_timeline(self, context=None, after="", before="Z", get_timeline=[]):
528
        holder_set = set()
529
        e_add = holder_set.add
530
        resource_set = set()
531
        r_add = resource_set.add
532

    
533
        for holder, resource in get_timeline:
534
            if holder not in holder_set:
535
                e_add(holder)
536

    
537
            r_add((holder, resource))
538

    
539
        chunk_size = 65536
540
        nr = 0
541
        timeline = []
542
        append = timeline.append
543
        filterlogs = ProvisionLog.objects.filter
544
        if holder_set:
545
            q_holder = Q(source__in=holder_set) | Q(target__in=holder_set)
546
        else:
547
            q_holder = Q()
548

    
549
        while 1:
550
            logs = filterlogs(q_holder,
551
                              issue_time__gt=after,
552
                              issue_time__lte=before,
553
                              reason__startswith='ACCEPT:')
554

    
555
            logs = logs.order_by('issue_time')
556
            #logs = logs.values()
557
            logs = logs[:chunk_size]
558
            nr += len(logs)
559
            if not logs:
560
                break
561
            for g in logs:
562
                if ((g.source, g.resource) not in resource_set
563
                    or (g.target, g.resource) not in resource_set):
564
                    continue
565

    
566
                o = {
567
                    'serial':                   g.serial,
568
                    'source':                   g.source,
569
                    'target':                   g.target,
570
                    'resource':                 g.resource,
571
                    'name':                     g.name,
572
                    'quantity':                 g.delta_quantity,
573
                    'source_allocated':         g.source_allocated(),
574
                    'source_allocated_through': g.source_allocated_through(),
575
                    'source_inbound':           g.source_inbound(),
576
                    'source_inbound_through':   g.source_inbound_through(),
577
                    'source_outbound':          g.source_outbound(),
578
                    'source_outbound_through':  g.source_outbound_through(),
579
                    'target_allocated':         g.target_allocated(),
580
                    'target_allocated_through': g.target_allocated_through(),
581
                    'target_inbound':           g.target_inbound(),
582
                    'target_inbound_through':   g.target_inbound_through(),
583
                    'target_outbound':          g.target_outbound(),
584
                    'target_outbound_through':  g.target_outbound_through(),
585
                    'issue_time':               g.issue_time,
586
                    'log_time':                 g.log_time,
587
                    'reason':                   g.reason,
588
                }
589

    
590
                append(o)
591

    
592
            after = g.issue_time
593
            if after >= before:
594
                break
595

    
596
        return timeline
597

    
598

    
599
API_Callpoint = QuotaholderDjangoDBCallpoint