Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / quotaholder / callpoint.py @ b78f2b29

History | View | Annotate | Download (18 kB)

1
# Copyright 2012, 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from astakos.quotaholder.exception import (
35
    QuotaholderError,
36
    CorruptedError, InvalidDataError,
37
    NoCapacityError,
38
    DuplicateError)
39

    
40
from astakos.quotaholder.commission import (
41
    Import, Release, Operations)
42

    
43
from astakos.quotaholder.utils.newname import newname
44
from astakos.quotaholder.api import QH_PRACTICALLY_INFINITE
45

    
46
from django.db.models import Q, Count
47
from django.db.models import Q
48
from .models import (Holding,
49
                     Commission, Provision, ProvisionLog,
50
                     now,
51
                     db_get_holding,
52
                     db_get_commission, db_filter_provision)
53

    
54

    
55
class QuotaholderDjangoDBCallpoint(object):
56

    
57
    def _init_holding(self,
58
                      holder, resource, limit,
59
                      imported_min, imported_max,
60
                      flags):
61
        try:
62
            h = db_get_holding(holder=holder, resource=resource,
63
                               for_update=True)
64
        except Holding.DoesNotExist:
65
            h = Holding(holder=holder, resource=resource)
66

    
67
        h.limit = limit
68
        h.flags = flags
69
        h.imported_min = imported_min
70
        h.imported_max = imported_max
71
        h.save()
72

    
73
    def init_holding(self, context=None, init_holding=[]):
74
        rejected = []
75
        append = rejected.append
76

    
77
        for idx, sfh in enumerate(init_holding):
78
            (holder, resource, limit,
79
             imported_min, imported_max,
80
             flags) = sfh
81

    
82
            self._init_holding(holder, resource, limit,
83
                               imported_min, imported_max,
84
                               flags)
85
        if rejected:
86
            raise QuotaholderError(rejected)
87
        return rejected
88

    
89
    def reset_holding(self, context=None, reset_holding=[]):
90
        rejected = []
91
        append = rejected.append
92

    
93
        for idx, tpl in enumerate(reset_holding):
94
            (holder, source, resource,
95
             imported_min, imported_max) = tpl
96

    
97
            try:
98
                h = db_get_holding(holder=holder,
99
                                   source=source,
100
                                   resource=resource,
101
                                   for_update=True)
102
                h.imported_min = imported_min
103
                h.imported_max = imported_max
104
                h.save()
105
            except Holding.DoesNotExist:
106
                append(idx)
107
                continue
108

    
109
        if rejected:
110
            raise QuotaholderError(rejected)
111
        return rejected
112

    
113
    def _check_pending(self, holding):
114
        ps = Provision.objects.filter(holding=holding)
115
        return ps.count()
116

    
117
    def release_holding(self, context=None, release_holding=[]):
118
        rejected = []
119
        append = rejected.append
120

    
121
        for idx, (holder, source, resource) in enumerate(release_holding):
122
            try:
123
                h = db_get_holding(holder=holder,
124
                                   source=source,
125
                                   resource=resource,
126
                                   for_update=True)
127
            except Holding.DoesNotExist:
128
                append(idx)
129
                continue
130

    
131
            if self._check_pending(h):
132
                append(idx)
133
                continue
134

    
135
            if h.imported_max > 0:
136
                append(idx)
137
                continue
138

    
139
            h.delete()
140

    
141
        if rejected:
142
            raise QuotaholderError(rejected)
143
        return rejected
144

    
145
    def list_resources(self, context=None, holder=None):
146
        holdings = Holding.objects.filter(holder=holder)
147
        resources = [h.resource for h in holdings]
148
        return resources
149

    
150
    def list_holdings(self, context=None, list_holdings=[]):
151
        rejected = []
152
        reject = rejected.append
153
        holdings_list = []
154
        append = holdings_list.append
155

    
156
        for holder in list_holdings:
157
            holdings = list(Holding.objects.filter(holder=holder))
158
            if not holdings:
159
                reject(holder)
160
                continue
161

    
162
            append([(holder, h.source, h.resource,
163
                     h.imported_min, h.imported_max)
164
                    for h in holdings])
165

    
166
        return holdings_list, rejected
167

    
168
    def get_quota(self, context=None, get_quota=[]):
169
        quotas = []
170
        append = quotas.append
171

    
172
        holders = set(holder for holder, r in get_quota)
173
        hs = Holding.objects.filter(holder__in=holders)
174
        holdings = {}
175
        for h in hs:
176
            holdings[(h.holder, h.source, h.resource)] = h
177

    
178
        for holder, source, resource in get_quota:
179
            try:
180
                h = holdings[(holder, source, resource)]
181
            except:
182
                continue
183

    
184
            append((h.holder, h.source, h.resource, h.limit,
185
                    h.imported_min, h.imported_max,
186
                    h.flags))
187

    
188
        return quotas
189

    
190
    def set_quota(self, context=None, set_quota=[]):
191
        rejected = []
192
        append = rejected.append
193

    
194
        q_holdings = Q()
195
        holders = []
196
        for (holder, source, resource, _, _) in set_quota:
197
            holders.append(holder)
198

    
199
        hs = Holding.objects.filter(holder__in=holders).select_for_update()
200
        holdings = {}
201
        for h in hs:
202
            holdings[(h.holder, h.source, h.resource)] = h
203

    
204
        for (holder, source, resource,
205
             limit,
206
             flags) in set_quota:
207

    
208
            try:
209
                h = holdings[(holder, source, resource)]
210
                h.flags = flags
211
            except KeyError:
212
                h = Holding(holder=holder,
213
                            source=source,
214
                            resource=resource,
215
                            flags=flags)
216

    
217
            h.limit = limit
218
            h.save()
219
            holdings[(holder, source, resource)] = h
220

    
221
        if rejected:
222
            raise QuotaholderError(rejected)
223
        return rejected
224

    
225
    def add_quota(self,
226
                  context=None,
227
                  sub_quota=[], add_quota=[]):
228
        rejected = []
229
        append = rejected.append
230

    
231
        sources = sub_quota + add_quota
232
        q_holdings = Q()
233
        holders = []
234
        for (holder, resource, _) in sources:
235
            holders.append(holder)
236

    
237
        hs = Holding.objects.filter(holder__in=holders).select_for_update()
238
        holdings = {}
239
        for h in hs:
240
            holdings[(h.holder, h.resource)] = h
241

    
242
        for removing, source in [(True, sub_quota), (False, add_quota)]:
243
            for (holder, resource,
244
                 limit,
245
                 ) in source:
246

    
247
                try:
248
                    h = holdings[(holder, resource)]
249
                    current_limit = h.limit
250
                except KeyError:
251
                    if removing:
252
                        append((holder, resource))
253
                        continue
254

    
255
                    h = Holding(holder=holder, resource=resource, flags=0)
256
                    current_limit = 0
257

    
258
                h.limit = (current_limit - limit if removing else
259
                           current_limit + limit)
260

    
261
                if h.limit < 0:
262
                    append((holder, resource))
263
                    continue
264

    
265
                h.save()
266
                holdings[(holder, resource)] = h
267

    
268
        if rejected:
269
            raise QuotaholderError(rejected)
270

    
271
        return rejected
272

    
273
    def issue_commission(self,
274
                         context=None,
275
                         clientkey=None,
276
                         name=None,
277
                         provisions=()):
278

    
279
        if name is None:
280
            name = ""
281
        create = Commission.objects.create
282
        commission = create(clientkey=clientkey, name=name)
283
        serial = commission.serial
284

    
285
        operations = Operations()
286

    
287
        try:
288
            checked = []
289
            for holder, source, resource, quantity in provisions:
290

    
291
                if holder == source:
292
                    m = ("Cannot issue commission from a holder "
293
                         "to itself (%s)" % (holder,))
294
                    raise InvalidDataError(m)
295

    
296
                ent_res = holder, resource
297
                if ent_res in checked:
298
                    m = "Duplicate provision for %s.%s" % ent_res
299
                    raise DuplicateError(m)
300
                checked.append(ent_res)
301

    
302
                # Target
303
                try:
304
                    th = db_get_holding(holder=holder,
305
                                        resource=resource,
306
                                        source=source,
307
                                        for_update=True)
308
                except Holding.DoesNotExist:
309
                    m = ("There is no capacity "
310
                         "to allocate into in %s.%s" % (holder, resource))
311
                    raise NoCapacityError(m,
312
                                          holder=holder,
313
                                          resource=resource,
314
                                          requested=quantity,
315
                                          current=0,
316
                                          limit=0)
317

    
318
                if quantity >= 0:
319
                    operations.prepare(Import, th, quantity)
320

    
321
                else: # release
322
                    abs_quantity = -quantity
323
                    operations.prepare(Release, th, abs_quantity)
324

    
325
                Provision.objects.create(serial=commission,
326
                                         holding=th,
327
                                         quantity=quantity)
328

    
329
        except QuotaholderError:
330
            operations.revert()
331
            raise
332

    
333
        return serial
334

    
335
    def _log_provision(self,
336
                       commission, provision, log_time, reason):
337

    
338
        holding = provision.holding
339

    
340
        kwargs = {
341
            'serial':              commission.serial,
342
            'name':                commission.name,
343
            'holder':              holding.holder,
344
            'source':              holding.source,
345
            'resource':            holding.resource,
346
            'limit':               holding.limit,
347
            'imported_min':        holding.imported_min,
348
            'imported_max':        holding.imported_max,
349
            'delta_quantity':      provision.quantity,
350
            'issue_time':          commission.issue_time,
351
            'log_time':            log_time,
352
            'reason':              reason,
353
        }
354

    
355
        ProvisionLog.objects.create(**kwargs)
356

    
357
    def accept_commission(self,
358
                          context=None, clientkey=None,
359
                          serials=[], reason=''):
360
        log_time = now()
361

    
362
        for serial in serials:
363
            try:
364
                c = db_get_commission(clientkey=clientkey, serial=serial,
365
                                      for_update=True)
366
            except Commission.DoesNotExist:
367
                return
368

    
369
            operations = Operations()
370

    
371
            provisions = db_filter_provision(serial=serial, for_update=True)
372
            for pv in provisions:
373
                try:
374
                    th = db_get_holding(id=pv.holding_id,
375
                                        for_update=True)
376
                except Holding.DoesNotExist:
377
                    m = "Corrupted provision"
378
                    raise CorruptedError(m)
379

    
380
                quantity = pv.quantity
381

    
382
                if quantity >= 0:
383
                    operations.finalize(Import, th, quantity)
384
                else: # release
385
                    abs_quantity = -quantity
386
                    operations.finalize(Release, th, abs_quantity)
387

    
388
                reason = 'ACCEPT:' + reason[-121:]
389
                self._log_provision(c, pv, log_time, reason)
390
                pv.delete()
391
            c.delete()
392

    
393
        return
394

    
395
    def reject_commission(self,
396
                          context=None, clientkey=None,
397
                          serials=[], reason=''):
398
        log_time = now()
399

    
400
        for serial in serials:
401
            try:
402
                c = db_get_commission(clientkey=clientkey, serial=serial,
403
                                      for_update=True)
404
            except Commission.DoesNotExist:
405
                return
406

    
407
            operations = Operations()
408

    
409
            provisions = db_filter_provision(serial=serial, for_update=True)
410
            for pv in provisions:
411
                try:
412
                    th = db_get_holding(id=pv.holding_id,
413
                                        for_update=True)
414
                except Holding.DoesNotExist:
415
                    m = "Corrupted provision"
416
                    raise CorruptedError(m)
417

    
418
                quantity = pv.quantity
419

    
420
                if quantity >= 0:
421
                    operations.undo(Import, th, quantity)
422
                else: # release
423
                    abs_quantity = -quantity
424
                    operations.undo(Release, th, abs_quantity)
425

    
426
                reason = 'REJECT:' + reason[-121:]
427
                self._log_provision(c, pv, log_time, reason)
428
                pv.delete()
429
            c.delete()
430

    
431
        return
432

    
433
    def get_pending_commissions(self, context=None, clientkey=None):
434
        pending = Commission.objects.filter(clientkey=clientkey)
435
        pending_list = pending.values_list('serial', flat=True)
436
        return pending_list
437

    
438
    def resolve_pending_commissions(self,
439
                                    context=None, clientkey=None,
440
                                    max_serial=None, accept_set=[]):
441
        accept_set = set(accept_set)
442
        pending = self.get_pending_commissions(context=context,
443
                                               clientkey=clientkey)
444
        pending = sorted(pending)
445

    
446
        accept = self.accept_commission
447
        reject = self.reject_commission
448

    
449
        for serial in pending:
450
            if serial > max_serial:
451
                break
452

    
453
            if serial in accept_set:
454
                accept(context=context, clientkey=clientkey, serials=[serial])
455
            else:
456
                reject(context=context, clientkey=clientkey, serials=[serial])
457

    
458
        return
459

    
460
    def get_timeline(self, context=None, after="", before="Z", get_timeline=[]):
461
        holder_set = set()
462
        e_add = holder_set.add
463
        resource_set = set()
464
        r_add = resource_set.add
465

    
466
        for holder, resource in get_timeline:
467
            if holder not in holder_set:
468
                e_add(holder)
469

    
470
            r_add((holder, resource))
471

    
472
        chunk_size = 65536
473
        nr = 0
474
        timeline = []
475
        append = timeline.append
476
        filterlogs = ProvisionLog.objects.filter
477
        if holder_set:
478
            q_holder = Q(source__in=holder_set) | Q(target__in=holder_set)
479
        else:
480
            q_holder = Q()
481

    
482
        while 1:
483
            logs = filterlogs(q_holder,
484
                              issue_time__gt=after,
485
                              issue_time__lte=before,
486
                              reason__startswith='ACCEPT:')
487

    
488
            logs = logs.order_by('issue_time')
489
            #logs = logs.values()
490
            logs = logs[:chunk_size]
491
            nr += len(logs)
492
            if not logs:
493
                break
494
            for g in logs:
495
                if ((g.source, g.resource) not in resource_set
496
                    or (g.target, g.resource) not in resource_set):
497
                    continue
498

    
499
                o = {
500
                    'serial':                   g.serial,
501
                    'source':                   g.source,
502
                    'target':                   g.target,
503
                    'resource':                 g.resource,
504
                    'name':                     g.name,
505
                    'quantity':                 g.delta_quantity,
506
                    'source_allocated':         g.source_allocated(),
507
                    'source_allocated_through': g.source_allocated_through(),
508
                    'source_inbound':           g.source_inbound(),
509
                    'source_inbound_through':   g.source_inbound_through(),
510
                    'source_outbound':          g.source_outbound(),
511
                    'source_outbound_through':  g.source_outbound_through(),
512
                    'target_allocated':         g.target_allocated(),
513
                    'target_allocated_through': g.target_allocated_through(),
514
                    'target_inbound':           g.target_inbound(),
515
                    'target_inbound_through':   g.target_inbound_through(),
516
                    'target_outbound':          g.target_outbound(),
517
                    'target_outbound_through':  g.target_outbound_through(),
518
                    'issue_time':               g.issue_time,
519
                    'log_time':                 g.log_time,
520
                    'reason':                   g.reason,
521
                }
522

    
523
                append(o)
524

    
525
            after = g.issue_time
526
            if after >= before:
527
                break
528

    
529
        return timeline
530

    
531

    
532
API_Callpoint = QuotaholderDjangoDBCallpoint