Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / quotaholder / callpoint.py @ dc9da5b9

History | View | Annotate | Download (19.3 kB)

1
# Copyright 2012, 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from astakos.quotaholder.exception import (
35
    QuotaholderError,
36
    CorruptedError, InvalidDataError,
37
    NoCapacityError,
38
    DuplicateError)
39

    
40
from astakos.quotaholder.commission import (
41
    Import, Release, Operations)
42

    
43
from astakos.quotaholder.utils.newname import newname
44
from astakos.quotaholder.api import QH_PRACTICALLY_INFINITE
45

    
46
from django.db.models import Q, Count
47
from django.db.models import Q
48
from .models import (Holding,
49
                     Commission, Provision, ProvisionLog,
50
                     now,
51
                     db_get_holding,
52
                     db_get_commission, db_filter_provision)
53

    
54

    
55
class QuotaholderDjangoDBCallpoint(object):
56

    
57
    def _init_holding(self,
58
                      holder, resource, limit,
59
                      imported_min, imported_max,
60
                      flags):
61
        try:
62
            h = db_get_holding(holder=holder, resource=resource,
63
                               for_update=True)
64
        except Holding.DoesNotExist:
65
            h = Holding(holder=holder, resource=resource)
66

    
67
        h.limit = limit
68
        h.flags = flags
69
        h.imported_min = imported_min
70
        h.imported_max = imported_max
71
        h.save()
72

    
73
    def init_holding(self, context=None, init_holding=[]):
74
        rejected = []
75
        append = rejected.append
76

    
77
        for idx, sfh in enumerate(init_holding):
78
            (holder, resource, limit,
79
             imported_min, imported_max,
80
             flags) = sfh
81

    
82
            self._init_holding(holder, resource, limit,
83
                               imported_min, imported_max,
84
                               flags)
85
        if rejected:
86
            raise QuotaholderError(rejected)
87
        return rejected
88

    
89
    def reset_holding(self, context=None, reset_holding=[]):
90
        rejected = []
91
        append = rejected.append
92

    
93
        for idx, tpl in enumerate(reset_holding):
94
            (holder, source, resource,
95
             imported_min, imported_max) = tpl
96

    
97
            try:
98
                h = db_get_holding(holder=holder,
99
                                   source=source,
100
                                   resource=resource,
101
                                   for_update=True)
102
                h.imported_min = imported_min
103
                h.imported_max = imported_max
104
                h.save()
105
            except Holding.DoesNotExist:
106
                append(idx)
107
                continue
108

    
109
        if rejected:
110
            raise QuotaholderError(rejected)
111
        return rejected
112

    
113
    def _check_pending(self, holding):
114
        ps = Provision.objects.filter(holding=holding)
115
        return ps.count()
116

    
117
    def release_holding(self, context=None, release_holding=[]):
118
        rejected = []
119
        append = rejected.append
120

    
121
        for idx, (holder, source, resource) in enumerate(release_holding):
122
            try:
123
                h = db_get_holding(holder=holder,
124
                                   source=source,
125
                                   resource=resource,
126
                                   for_update=True)
127
            except Holding.DoesNotExist:
128
                append(idx)
129
                continue
130

    
131
            if self._check_pending(h):
132
                append(idx)
133
                continue
134

    
135
            if h.imported_max > 0:
136
                append(idx)
137
                continue
138

    
139
            h.delete()
140

    
141
        if rejected:
142
            raise QuotaholderError(rejected)
143
        return rejected
144

    
145
    def list_resources(self, context=None, holder=None):
146
        holdings = Holding.objects.filter(holder=holder)
147
        resources = [h.resource for h in holdings]
148
        return resources
149

    
150
    def list_holdings(self, context=None, list_holdings=[]):
151
        rejected = []
152
        reject = rejected.append
153
        holdings_list = []
154
        append = holdings_list.append
155

    
156
        for holder in list_holdings:
157
            holdings = list(Holding.objects.filter(holder=holder))
158
            if not holdings:
159
                reject(holder)
160
                continue
161

    
162
            append([(holder, h.source, h.resource,
163
                     h.imported_min, h.imported_max)
164
                    for h in holdings])
165

    
166
        return holdings_list, rejected
167

    
168
    def get_holder_quota(self, holders=None, sources=None, resources=None):
169
        holdings = Holding.objects.filter(holder__in=holders)
170

    
171
        if sources is not None:
172
            holdings = holdings.filter(source__in=sources)
173

    
174
        if resources is not None:
175
            holdings = holdings.filter(resource__in=resources)
176

    
177
        quotas = {}
178
        for holding in holdings:
179
            key = (holding.holder, holding.source, holding.resource)
180
            value = (holding.limit, holding.imported_min, holding.imported_max)
181
            quotas[key] = value
182

    
183
        return quotas
184

    
185
    def get_quota(self, context=None, get_quota=[]):
186
        quotas = []
187
        append = quotas.append
188

    
189
        holders = set(holder for holder, r in get_quota)
190
        hs = Holding.objects.filter(holder__in=holders)
191
        holdings = {}
192
        for h in hs:
193
            holdings[(h.holder, h.source, h.resource)] = h
194

    
195
        for holder, source, resource in get_quota:
196
            try:
197
                h = holdings[(holder, source, resource)]
198
            except:
199
                continue
200

    
201
            append((h.holder, h.source, h.resource, h.limit,
202
                    h.imported_min, h.imported_max,
203
                    h.flags))
204

    
205
        return quotas
206

    
207
    def set_holder_quota(self, quotas):
208
        holders = quotas.keys()
209
        hs = Holding.objects.filter(holder__in=holders).select_for_update()
210
        holdings = {}
211
        for h in hs:
212
            holdings[(h.holder, h.source, h.resource)] = h
213

    
214
        for holder, holder_quota in quotas.iteritems():
215
            for source, source_quota in holder_quota.iteritems():
216
                for resource, limit in source_quota.iteritems():
217
                    try:
218
                        h = holdings[(holder, source, resource)]
219
                    except KeyError:
220
                        h = Holding(holder=holder,
221
                                    source=source,
222
                                    resource=resource)
223

    
224
                    h.limit = limit
225
                    h.save()
226

    
227
    def set_quota(self, context=None, set_quota=[]):
228
        rejected = []
229
        append = rejected.append
230

    
231
        q_holdings = Q()
232
        holders = []
233
        for (holder, source, resource, _, _) in set_quota:
234
            holders.append(holder)
235

    
236
        hs = Holding.objects.filter(holder__in=holders).select_for_update()
237
        holdings = {}
238
        for h in hs:
239
            holdings[(h.holder, h.source, h.resource)] = h
240

    
241
        for (holder, source, resource,
242
             limit,
243
             flags) in set_quota:
244

    
245
            try:
246
                h = holdings[(holder, source, resource)]
247
                h.flags = flags
248
            except KeyError:
249
                h = Holding(holder=holder,
250
                            source=source,
251
                            resource=resource,
252
                            flags=flags)
253

    
254
            h.limit = limit
255
            h.save()
256
            holdings[(holder, source, resource)] = h
257

    
258
        if rejected:
259
            raise QuotaholderError(rejected)
260
        return rejected
261

    
262
    def add_quota(self,
263
                  context=None,
264
                  sub_quota=[], add_quota=[]):
265
        rejected = []
266
        append = rejected.append
267

    
268
        sources = sub_quota + add_quota
269
        q_holdings = Q()
270
        holders = []
271
        for (holder, resource, _) in sources:
272
            holders.append(holder)
273

    
274
        hs = Holding.objects.filter(holder__in=holders).select_for_update()
275
        holdings = {}
276
        for h in hs:
277
            holdings[(h.holder, h.resource)] = h
278

    
279
        for removing, source in [(True, sub_quota), (False, add_quota)]:
280
            for (holder, resource,
281
                 limit,
282
                 ) in source:
283

    
284
                try:
285
                    h = holdings[(holder, resource)]
286
                    current_limit = h.limit
287
                except KeyError:
288
                    if removing:
289
                        append((holder, resource))
290
                        continue
291

    
292
                    h = Holding(holder=holder, resource=resource, flags=0)
293
                    current_limit = 0
294

    
295
                h.limit = (current_limit - limit if removing else
296
                           current_limit + limit)
297

    
298
                if h.limit < 0:
299
                    append((holder, resource))
300
                    continue
301

    
302
                h.save()
303
                holdings[(holder, resource)] = h
304

    
305
        if rejected:
306
            raise QuotaholderError(rejected)
307

    
308
        return rejected
309

    
310
    def issue_commission(self,
311
                         context=None,
312
                         clientkey=None,
313
                         name=None,
314
                         provisions=()):
315

    
316
        if name is None:
317
            name = ""
318
        create = Commission.objects.create
319
        commission = create(clientkey=clientkey, name=name)
320
        serial = commission.serial
321

    
322
        operations = Operations()
323

    
324
        try:
325
            checked = []
326
            for holder, source, resource, quantity in provisions:
327

    
328
                if holder == source:
329
                    m = ("Cannot issue commission from a holder "
330
                         "to itself (%s)" % (holder,))
331
                    raise InvalidDataError(m)
332

    
333
                ent_res = holder, resource
334
                if ent_res in checked:
335
                    m = "Duplicate provision for %s.%s" % ent_res
336
                    raise DuplicateError(m)
337
                checked.append(ent_res)
338

    
339
                # Target
340
                try:
341
                    th = db_get_holding(holder=holder,
342
                                        resource=resource,
343
                                        source=source,
344
                                        for_update=True)
345
                except Holding.DoesNotExist:
346
                    m = ("There is no capacity "
347
                         "to allocate into in %s.%s" % (holder, resource))
348
                    raise NoCapacityError(m,
349
                                          holder=holder,
350
                                          resource=resource,
351
                                          requested=quantity,
352
                                          current=0,
353
                                          limit=0)
354

    
355
                if quantity >= 0:
356
                    operations.prepare(Import, th, quantity)
357

    
358
                else: # release
359
                    abs_quantity = -quantity
360
                    operations.prepare(Release, th, abs_quantity)
361

    
362
                Provision.objects.create(serial=commission,
363
                                         holding=th,
364
                                         quantity=quantity)
365

    
366
        except QuotaholderError:
367
            operations.revert()
368
            raise
369

    
370
        return serial
371

    
372
    def _log_provision(self,
373
                       commission, provision, log_time, reason):
374

    
375
        holding = provision.holding
376

    
377
        kwargs = {
378
            'serial':              commission.serial,
379
            'name':                commission.name,
380
            'holder':              holding.holder,
381
            'source':              holding.source,
382
            'resource':            holding.resource,
383
            'limit':               holding.limit,
384
            'imported_min':        holding.imported_min,
385
            'imported_max':        holding.imported_max,
386
            'delta_quantity':      provision.quantity,
387
            'issue_time':          commission.issue_time,
388
            'log_time':            log_time,
389
            'reason':              reason,
390
        }
391

    
392
        ProvisionLog.objects.create(**kwargs)
393

    
394
    def accept_commission(self,
395
                          context=None, clientkey=None,
396
                          serials=[], reason=''):
397
        log_time = now()
398

    
399
        for serial in serials:
400
            try:
401
                c = db_get_commission(clientkey=clientkey, serial=serial,
402
                                      for_update=True)
403
            except Commission.DoesNotExist:
404
                return
405

    
406
            operations = Operations()
407

    
408
            provisions = db_filter_provision(serial=serial, for_update=True)
409
            for pv in provisions:
410
                try:
411
                    th = db_get_holding(id=pv.holding_id,
412
                                        for_update=True)
413
                except Holding.DoesNotExist:
414
                    m = "Corrupted provision"
415
                    raise CorruptedError(m)
416

    
417
                quantity = pv.quantity
418

    
419
                if quantity >= 0:
420
                    operations.finalize(Import, th, quantity)
421
                else: # release
422
                    abs_quantity = -quantity
423
                    operations.finalize(Release, th, abs_quantity)
424

    
425
                reason = 'ACCEPT:' + reason[-121:]
426
                self._log_provision(c, pv, log_time, reason)
427
                pv.delete()
428
            c.delete()
429

    
430
        return
431

    
432
    def reject_commission(self,
433
                          context=None, clientkey=None,
434
                          serials=[], reason=''):
435
        log_time = now()
436

    
437
        for serial in serials:
438
            try:
439
                c = db_get_commission(clientkey=clientkey, serial=serial,
440
                                      for_update=True)
441
            except Commission.DoesNotExist:
442
                return
443

    
444
            operations = Operations()
445

    
446
            provisions = db_filter_provision(serial=serial, for_update=True)
447
            for pv in provisions:
448
                try:
449
                    th = db_get_holding(id=pv.holding_id,
450
                                        for_update=True)
451
                except Holding.DoesNotExist:
452
                    m = "Corrupted provision"
453
                    raise CorruptedError(m)
454

    
455
                quantity = pv.quantity
456

    
457
                if quantity >= 0:
458
                    operations.undo(Import, th, quantity)
459
                else: # release
460
                    abs_quantity = -quantity
461
                    operations.undo(Release, th, abs_quantity)
462

    
463
                reason = 'REJECT:' + reason[-121:]
464
                self._log_provision(c, pv, log_time, reason)
465
                pv.delete()
466
            c.delete()
467

    
468
        return
469

    
470
    def get_pending_commissions(self, context=None, clientkey=None):
471
        pending = Commission.objects.filter(clientkey=clientkey)
472
        pending_list = pending.values_list('serial', flat=True)
473
        return pending_list
474

    
475
    def resolve_pending_commissions(self,
476
                                    context=None, clientkey=None,
477
                                    max_serial=None, accept_set=[]):
478
        accept_set = set(accept_set)
479
        pending = self.get_pending_commissions(context=context,
480
                                               clientkey=clientkey)
481
        pending = sorted(pending)
482

    
483
        accept = self.accept_commission
484
        reject = self.reject_commission
485

    
486
        for serial in pending:
487
            if serial > max_serial:
488
                break
489

    
490
            if serial in accept_set:
491
                accept(context=context, clientkey=clientkey, serials=[serial])
492
            else:
493
                reject(context=context, clientkey=clientkey, serials=[serial])
494

    
495
        return
496

    
497
    def get_timeline(self, context=None, after="", before="Z", get_timeline=[]):
498
        holder_set = set()
499
        e_add = holder_set.add
500
        resource_set = set()
501
        r_add = resource_set.add
502

    
503
        for holder, resource in get_timeline:
504
            if holder not in holder_set:
505
                e_add(holder)
506

    
507
            r_add((holder, resource))
508

    
509
        chunk_size = 65536
510
        nr = 0
511
        timeline = []
512
        append = timeline.append
513
        filterlogs = ProvisionLog.objects.filter
514
        if holder_set:
515
            q_holder = Q(source__in=holder_set) | Q(target__in=holder_set)
516
        else:
517
            q_holder = Q()
518

    
519
        while 1:
520
            logs = filterlogs(q_holder,
521
                              issue_time__gt=after,
522
                              issue_time__lte=before,
523
                              reason__startswith='ACCEPT:')
524

    
525
            logs = logs.order_by('issue_time')
526
            #logs = logs.values()
527
            logs = logs[:chunk_size]
528
            nr += len(logs)
529
            if not logs:
530
                break
531
            for g in logs:
532
                if ((g.source, g.resource) not in resource_set
533
                    or (g.target, g.resource) not in resource_set):
534
                    continue
535

    
536
                o = {
537
                    'serial':                   g.serial,
538
                    'source':                   g.source,
539
                    'target':                   g.target,
540
                    'resource':                 g.resource,
541
                    'name':                     g.name,
542
                    'quantity':                 g.delta_quantity,
543
                    'source_allocated':         g.source_allocated(),
544
                    'source_allocated_through': g.source_allocated_through(),
545
                    'source_inbound':           g.source_inbound(),
546
                    'source_inbound_through':   g.source_inbound_through(),
547
                    'source_outbound':          g.source_outbound(),
548
                    'source_outbound_through':  g.source_outbound_through(),
549
                    'target_allocated':         g.target_allocated(),
550
                    'target_allocated_through': g.target_allocated_through(),
551
                    'target_inbound':           g.target_inbound(),
552
                    'target_inbound_through':   g.target_inbound_through(),
553
                    'target_outbound':          g.target_outbound(),
554
                    'target_outbound_through':  g.target_outbound_through(),
555
                    'issue_time':               g.issue_time,
556
                    'log_time':                 g.log_time,
557
                    'reason':                   g.reason,
558
                }
559

    
560
                append(o)
561

    
562
            after = g.issue_time
563
            if after >= before:
564
                break
565

    
566
        return timeline
567

    
568

    
569
API_Callpoint = QuotaholderDjangoDBCallpoint