Statistics
| Branch: | Tag: | Revision:

root / commissioning / servers / quotaholder / django_backend / callpoint.py @ 4d516a3b

History | View | Annotate | Download (22 kB)

1

    
2
from commissioning import ( QuotaholderAPI,
3
                            Callpoint, CommissionException,
4
                            CorruptedError, InvalidDataError,
5
                            InvalidKeyError, NoEntityError,
6
                            NoQuantityError, NoCapacityError,
7
                            ExportLimitError, ImportLimitError)
8

    
9

    
10
from commissioning.utils.newname import newname
11
from django.db.models import Model, BigIntegerField, CharField, ForeignKey, Q
12
from django.db import transaction, IntegrityError
13
from .models import (Holder, Entity, Policy, Holding,
14
                     Commission, Provision, ProvisionLog, now)
15

    
16

    
17
class QuotaholderDjangoDBCallpoint(Callpoint):
18

    
19
    api_spec = QuotaholderAPI()
20

    
21
    http_exc_lookup = {
22
        CorruptedError:   550,
23
        InvalidDataError: 400,
24
        InvalidKeyError:  401,
25
        NoEntityError:    404,
26
        NoQuantityError:  413,
27
        NoCapacityError:  413,
28
    }
29

    
30
    def init_connection(self, connection):
31
        if connection is not None:
32
            raise ValueError("Cannot specify connection args with %s" %
33
                             type(self).__name__)
34
        pass
35

    
36
    def commit(self):
37
        transaction.commit()
38

    
39
    def rollback(self):
40
        transaction.rollback()
41

    
42
    def do_make_call(self, call_name, data):
43
        call_fn = getattr(self, call_name, None)
44
        if not call_fn:
45
            m = "cannot find call '%s'" % (call_name,)
46
            raise CorruptedError(m)
47

    
48
        return call_fn(**data)
49

    
50
    def create_entity(self, context={}, create_entity=()):
51
        rejected = []
52
        append = rejected.append
53

    
54
        for entity, owner, key, ownerkey in create_entity:
55
            try:
56
                owner = Entity.objects.get(entity=owner, key=ownerkey)
57
            except Entity.DoesNotExist:
58
                append(entity)
59
                continue
60

    
61
            try:
62
                e = Entity.objects.get(entity=entity, owner=owner)
63
                append(entity)
64
            except Entity.DoesNotExist:
65
                e = Entity.objects.create(entity=entity,
66
                                          owner=owner,
67
                                          key=key)
68
        return rejected
69

    
70
    def set_entity_key(self, context={}, set_entity_key=()):
71
        rejected = []
72
        append = rejected.append
73

    
74
        for entity, key, newkey in set_entity_key:
75
            try:
76
                e = Entity.objects.get(entity=entity, key=key)
77
            except Entity.DoesNotExist:
78
                append(entity)
79
                continue
80

    
81
            e.key = newkey
82
            e.save()
83

    
84
        return rejected
85

    
86
    def list_entities(self, context={}, entity=None, key=None):
87
        try:
88
            e = Entity.objects.get(entity=entity, key=key)
89
        except Entity.DoesNotExist:
90
            m = "Entity '%s' does not exist" % (entity,)
91
            raise NoEntityError(m)
92

    
93
        children = e.entities.all()
94
        entities = [e.entity for e in children]
95
        return entities
96

    
97
    def get_entity(self, context={}, get_entity=()):
98
        entities = []
99
        append = entities.append
100

    
101
        for entity, key in get_entity:
102
            try:
103
                Entity.objects.get(entity=entity, key=key)
104
            except Entity.DoesNotExist:
105
                continue
106

    
107
            append((entity, key))
108

    
109
        return entities
110

    
111
    def get_limits(self, context={}, get_limits=()):
112
        limits = []
113
        append = limits.append
114

    
115
        for entity, resource, key in get_limits:
116
            try:
117
                h = Holding.objects.get(entity=entity, resource=resource)
118
            except Policy.DoesNotExist:
119
                continue
120

    
121
            if h.entity.key != key:
122
                continue
123
            p = h.policy
124
            append((h.entity, h.resource, p.quantity, p.capacity,
125
                    p.import_limit, p.export_limit, h.flags))
126

    
127
        return limits
128

    
129
    def set_limits(self, context={}, set_limits=()):
130

    
131
        for (   policy, quantity, capacity,
132
                import_limit, export_limit  ) in set_limits:
133

    
134
                try:
135
                    policy = Policy.objects.get(policy=policy)
136
                except Policy.DoesNotExist:
137
                    Policy.objects.create(  policy=policy,
138
                                            quantity=quantity,
139
                                            capacity=capacity,
140
                                            import_limit=import_limit,
141
                                            export_limit=export_limit   )
142
                else:
143
                    policy.quantity = quantity
144
                    policy.capacity = capacity
145
                    policy.export_limit = export_limit
146
                    policy.import_limit = import_limit
147
                    policy.save()
148

    
149
        return ()
150

    
151
    def get_holding(self, context={}, get_holding=()):
152
        holdings = []
153
        append = holdings.append
154

    
155
        for entity, resource, key in get_holding:
156
            try:
157
                h = Holding.objects.get(entity=entity, resource=resource)
158
            except Holding.DoesNotExist:
159
                continue
160

    
161
            if h.entity.key != key:
162
                continue
163

    
164
            append((h.entity.entity, h.resource, h.policy,
165
                    h.imported, h.exported,
166
                    h.returned, h.released, h.flags))
167

    
168
        return holdings
169

    
170
    def set_holding(self, context={}, set_holding=()):
171
        rejected = []
172
        append = rejected.append
173

    
174
        for entity, resource, key, policy, flags in set_holding:
175
            try:
176
                e = Entity.objects.get(entity=entity, key=key)
177
            except Entity.DoesNotExist:
178
                append((entity, resource, policy))
179
                continue
180

    
181
            if e.key != key:
182
                append((entity, resource, policy))
183
                continue
184

    
185
            try:
186
                p = Policy.objects.get(policy=policy)
187
            except Policy.DoesNotExist:
188
                append((entity, resource, policy))
189
                continue
190

    
191
            try:
192
                h = Holding.objects.get(entity=entity, resource=resource)
193
                h.policy = p
194
                h.flags = flags
195
                h.save()
196
            except Holding.DoesNotExist:
197
                h = Holding.objects.create( entity=entity, resource=resource,
198
                                            policy=policy, flags=flags      )
199

    
200
        return rejected
201

    
202
    def list_resources(self, context={}, entity=None, key=None):
203
        try:
204
            e = Entity.objects.get(entity=entity)
205
        except Entity.DoesNotExist:
206
            m = "No such entity '%s'" % (entity,)
207
            raise NoEntityError(m)
208

    
209
        if e.key != key:
210
            m = "Invalid key for entity '%s'" % (entity,)
211
            raise InvalidKeyError(m)
212

    
213
        holdings = e.holding_set.filter(entity=entity)
214
        resources = [h.resource for h in holdings]
215
        return resources
216

    
217
    def get_quota(self, context={}, get_quota=()):
218
        quotas = []
219
        append = quotas.append
220

    
221
        for entity, resource, key in get_quota:
222
            try:
223
                h = Holding.objects.get(entity=entity, resource=resource)
224
            except Holding.DoesNotExist:
225
                continue
226

    
227
            if h.entity.key != key:
228
                continue
229

    
230
            p = h.policy
231

    
232
            append((h.entity.entity, h.resource, p.quantity, p.capacity,
233
                    p.import_limit, p.export_limit,
234
                    h.imported, h.exported,
235
                    h.returned, h.released,
236
                    h.flags))
237

    
238
        return quotas
239

    
240
    def set_quota(self, context={}, set_quota=()):
241
        rejected = []
242
        append = rejected.append
243

    
244
        for (   entity, resource, key,
245
                quantity, capacity,
246
                import_limit, export_limit, flags  ) in set_quota:
247

    
248
                p = None
249

    
250
                try:
251
                    h = Holding.objects.get(entity=entity, resource=resource)
252
                    if h.entity.key != key:
253
                        append((entity, resource))
254
                        continue
255
                    p = h.policy
256

    
257
                except Holding.DoesNotExist:
258
                    try:
259
                        e = Entity.objects.get(entity=entity)
260
                    except Entity.DoesNotExist:
261
                        append((entity, resource))
262
                        continue
263

    
264
                    if e.key != key:
265
                        append((entity, resource))
266
                        continue
267

    
268
                    h = None
269

    
270
                policy = newname('policy_')
271
                newp = Policy   (
272
                            policy=policy,
273
                            quantity=quantity,
274
                            capacity=capacity,
275
                            import_limit=import_limit,
276
                            export_limit=export_limit
277
                )
278

    
279
                if h is None:
280
                    h = Holding(entity=e, resource=resource,
281
                                policy=newp, flags=flags)
282
                else:
283
                    h.policy = newp
284
                    h.flags = flags
285

    
286
                h.save()
287
                newp.save()
288

    
289
                if p is not None and p.holding_set.count() == 0:
290
                    p.delete()
291

    
292
        return rejected
293

    
294
    def issue_commission(self,  context     =   {},
295
                                clientkey   =   None,
296
                                target      =   None,
297
                                key         =   None,
298
                                owner       =   None,
299
                                ownerkey    =   None,
300
                                name        =   None,
301
                                provisions  =   ()  ):
302

    
303
        try:
304
            t = Entity.objects.get(entity=target)
305
        except Entity.DoesNotExist:
306
            create_entity = ((target, owner, key, ownerkey),)
307
            rejected = self.create_entity(context       =   context,
308
                                          create_entity =   create_entity)
309
            if rejected:
310
                raise NoEntityError("No target entity '%s'" % (target,))
311

    
312
            t = Entity.objects.get(entity=target)
313
        else:
314
            if t.key != key:
315
                m = "Invalid key for target entity '%s'" % (target,)
316
                raise InvalidKeyError(m)
317

    
318
        create = Commission.objects.create
319
        commission = create(entity_id=target, clientkey=clientkey, name=name)
320
        serial = commission.serial
321

    
322
        for entity, resource, quantity in provisions:
323
            release = 0
324
            if quantity < 0:
325
                release = 1
326

    
327
            try:
328
                h = Holding.objects.get(entity=entity, resource=resource)
329
            except Holding.DoesNotExist:
330
                m = ("There is not enough quantity "
331
                     "to allocate from in %s.%s" % (entity, resource))
332
                raise NoQuantityError(m)
333

    
334
            hp = h.policy
335

    
336
            if (hp.export_limit is not None and
337
                h.exporting + quantity > hp.export_limit):
338
                    m = ("Export limit reached for %s.%s" % (entity, resource))
339
                    raise ExportLimitError(m)
340

    
341
            if hp.quantity is not None:
342
                available = (+ hp.quantity + h.imported + h.returned
343
                             - h.exporting - h.releasing)
344

    
345
                if available - quantity < 0:
346
                    m = ("There is not enough quantity "
347
                         "to allocate from in %s.%s" % (entity, resource))
348
                    raise NoQuantityError(m)
349

    
350
            try:
351
                th = Holding.objects.get(entity=target, resource=resource)
352
            except Holding.DoesNotExist:
353
                m = ("There is not enough capacity "
354
                     "to allocate into in %s.%s" % (target, resource))
355
                raise NoCapacityError(m)
356

    
357
            tp = th.policy
358

    
359
            if (tp.import_limit is not None and
360
                th.importing + quantity > tp.import_limit):
361
                    m = ("Import limit reached for %s.%s" % (target, resource))
362
                    raise ImportLimitError(m)
363

    
364
            if tp.capacity is not None:
365
                capacity = (+ tp.capacity + th.exported + th.released
366
                            - th.importing - th.returning)
367

    
368
                if capacity - quantity < 0:
369
                        m = ("There is not enough capacity "
370
                             "to allocate into in %s.%s" % (target, resource))
371
                        raise NoCapacityError(m)
372

    
373
            Provision.objects.create(   serial      =   commission,
374
                                        entity      =   t,
375
                                        resource    =   resource,
376
                                        quantity    =   quantity   )
377
            if release:
378
                h.returning -= quantity
379
                th.releasing -= quantity
380
            else:
381
                h.exporting += quantity
382
                th.importing += quantity
383

    
384
            h.save()
385
            th.save()
386

    
387
        return serial
388

    
389
    def _log_provision(self, commission, s_holding, t_holding,
390
                             provision, log_time, reason):
391

    
392
        s_entity = s_holding.entity
393
        s_policy = s_holding.policy
394
        t_entity = t_holding.entity
395
        t_policy = t_holding.policy
396

    
397
        ProvisionLog.objects.create(
398
                        serial              =   commission.serial,
399
                        name                =   commission.name,
400
                        source              =   s_entity.entity,
401
                        target              =   t_entity.entity,
402
                        resource            =   provision.resource,
403
                        source_quantity     =   s_policy.quantity,
404
                        source_capacity     =   s_policy.capacity,
405
                        source_import_limit =   s_policy.import_limit,
406
                        source_export_limit =   s_policy.export_limit,
407
                        source_imported     =   s_holding.imported,
408
                        source_exported     =   s_holding.exported,
409
                        source_returned     =   s_holding.returned,
410
                        source_released     =   s_holding.released,
411
                        target_quantity     =   t_policy.quantity,
412
                        target_capacity     =   t_policy.capacity,
413
                        target_import_limit =   t_policy.import_limit,
414
                        target_export_limit =   t_policy.export_limit,
415
                        target_imported     =   t_holding.imported,
416
                        target_exported     =   t_holding.exported,
417
                        target_returned     =   t_holding.returned,
418
                        target_released     =   t_holding.released,
419
                        delta_quantity      =   provision.quantity,
420
                        issue_time          =   commission.issue_time,
421
                        log_time            =   log_time,
422
                        reason              =   reason)
423

    
424
    def accept_commission(self, context={}, clientkey=None,
425
                                serials=(), reason=''):
426
        log_time = now()
427

    
428
        for serial in serials:
429
            try:
430
                c = Commission.objects.get(clientkey=clientkey, serial=serial)
431
            except Commission.DoesNotExist:
432
                return
433

    
434
            t = c.entity
435

    
436
            provisions = Provision.objects.filter(serial=serial)
437
            for pv in provisions:
438
                try:
439
                    h = Holding.objects.get(entity=pv.entity.entity,
440
                                            resource=pv.resource    )
441
                    th = Holding.objects.get(entity=t, resource=pv.resource)
442
                except Holding.DoesNotExist:
443
                    m = "Corrupted provision"
444
                    raise CorruptedError(m)
445

    
446
                quantity = pv.quantity
447
                release = 0
448
                if quantity < 0:
449
                    release = 1
450

    
451
                if release:
452
                    h.returned -= quantity
453
                    th.released -= quantity
454
                else:
455
                    h.exported += quantity
456
                    th.imported += quantity
457

    
458
                reason = 'ACCEPT:' + reason[-121:]
459
                self._log_provision(c, h, th, pv, log_time, reason)
460
                h.save()
461
                th.save()
462
                pv.delete()
463

    
464
        return
465

    
466
    def reject_commission(self, context={}, clientkey=None,
467
                                serials=(), reason=''):
468
        log_time = now()
469

    
470
        for serial in serials:
471
            try:
472
                c = Commission.objects.get(clientkey=clientkey, serial=serial)
473
            except Commission.DoesNotExist:
474
                return
475

    
476
            t = c.entity
477

    
478
            provisions = Provision.objects.filter(serial=serial)
479
            for pv in provisions:
480
                try:
481
                    h = Holding.objects.get(entity=pv.entity.entity,
482
                                            resource=pv.resource)
483
                    th = Holding.objects.get(entity=t, resource=pv.resource)
484
                except Holding.DoesNotExist:
485
                    m = "Corrupted provision"
486
                    raise CorruptedError(m)
487

    
488
                quantity = pv.quantity
489
                release = 0
490
                if quantity < 0:
491
                    release = 1
492

    
493
                if release:
494
                    h.returning += quantity
495
                    th.releasing += quantity
496
                else:
497
                    h.exporting -= quantity
498
                    th.importing -= quantity
499

    
500
                reason = 'REJECT:' + reason[-121:]
501
                self._log_provision(c, h, th, pv, log_time, reason)
502
                h.save()
503
                th.save()
504
                pv.delete()
505

    
506
        return
507

    
508
    def get_pending_commissions(self, context={}, clientkey=None):
509
        pending = Commission.objects.filter(clientkey=clientkey)\
510
                                    .values_list('serial', flat=True)
511
        return pending
512

    
513
    def resolve_pending_commissions(self,   context={}, clientkey=None,
514
                                            max_serial=None, accept_set=()  ):
515
        accept_set = set(accept_set)
516
        pending = self.get_pending_commissions(clientkey=clientkey)
517
        pending = sorted(pending)
518

    
519
        accept = self.accept_commission
520
        reject = self.reject_commission
521

    
522
        for serial in pending:
523
            if serial > max_serial:
524
                break
525

    
526
            if serial in accept_set:
527
                accept(clientkey=clientkey, serial=serial)
528
            else:
529
                reject(clientkey=clientkey, serial=serial)
530

    
531
        return
532

    
533
    def release_entity(self, context={}, release_entity=()):
534
        rejected = []
535
        append = rejected.append
536
        for entity, key in release_entity:
537
            try:
538
                e = Entity.objects.get(entity=entity, key=key)
539
            except Entity.DoesNotExist:
540
                append(entity)
541
                continue
542

    
543
            if e.entities.count() != 0:
544
                append(entity)
545
                continue
546

    
547
            if e.holding_set.count() != 0:
548
                append(entity)
549
                continue
550

    
551
            e.delete()
552

    
553
        return rejected
554

    
555
    def get_timeline(self, context={}, after="", before="Z", get_timeline=()):
556
        entity_set = set()
557
        e_add = entity_set.add
558
        resource_set = set()
559
        r_add = resource_set.add
560

    
561
        for entity, resource, key in get_timeline:
562
            if entity not in entity_set:
563
                try:
564
                    e = Entity.objects.get(entity=entity, key=key)
565
                    e_add(entity)
566
                except Entity.DoesNotExist:
567
                    continue
568

    
569
            r_add((entity, resource))
570

    
571
        chunk_size = 65536
572
        nr = 0
573
        timeline = []
574
        append = timeline.append
575
        filterlogs = ProvisionLog.objects.filter
576
        if entity_set:
577
            q_entity = Q(source__in = entity_set) | Q(target__in = entity_set)
578
        else:
579
            q_entity = Q()
580

    
581
        while 1:
582
            logs = filterlogs(  q_entity,
583
                                issue_time__gt      =   after,
584
                                issue_time__lte     =   before,
585
                                reason__startswith  =   'ACCEPT:'   )
586

    
587
            logs = logs.order_by('issue_time')
588
            #logs = logs.values()
589
            logs = logs[:chunk_size]
590
            nr += len(logs)
591
            if not logs:
592
                break
593
            for g in logs:
594
                if ((g.source, g.resource) not in resource_set
595
                    or (g.target, g.resource) not in resource_set):
596
                        continue
597

    
598
                o = {
599
                    'serial'                    :   g.serial,
600
                    'source'                    :   g.source,
601
                    'target'                    :   g.target,
602
                    'resource'                  :   g.resource,
603
                    'name'                      :   g.name,
604
                    'quantity'                  :   g.delta_quantity,
605
                    'source_allocated'          :   g.source_allocated(),
606
                    'source_allocated_through'  :   g.source_allocated_through(),
607
                    'source_inbound'            :   g.source_inbound(),
608
                    'source_inbound_through'    :   g.source_inbound_through(),
609
                    'source_outbound'           :   g.source_outbound(),
610
                    'source_outbound_through'   :   g.source_outbound_through(),
611
                    'target_allocated'          :   g.target_allocated(),
612
                    'target_allocated_through'  :   g.target_allocated_through(),
613
                    'target_inbound'            :   g.target_inbound(),
614
                    'target_inbound_through'    :   g.target_inbound_through(),
615
                    'target_outbound'           :   g.target_outbound(),
616
                    'target_outbound_through'   :   g.target_outbound_through(),
617
                    'issue_time'                :   g.issue_time,
618
                    'log_time'                  :   g.log_time,
619
                    'reason'                    :   g.reason,
620
                }
621
                    
622
                append(o)
623

    
624
            after = g.issue_time
625
            if after >= before:
626
                break
627

    
628
        return timeline
629

    
630

    
631
API_Callpoint = QuotaholderDjangoDBCallpoint
632