Statistics
| Branch: | Tag: | Revision:

root / commissioning / servers / quotaholder / django_backend / callpoint.py @ a2db0eb5

History | View | Annotate | Download (19.1 kB)

1

    
2
from commissioning import ( QuotaholderAPI,
3
                            Callpoint, CommissionException,
4
                            CorruptedError, InvalidDataError,
5
                            InvalidKeyError, NoEntityError,
6
                            NoQuantityError, NoCapacityError,
7
                            ExportLimitError, ImportLimitError)
8

    
9

    
10
from commissioning.utils.newname import newname
11
from django.db.models import Model, BigIntegerField, CharField, ForeignKey, Q
12
from django.db import transaction, IntegrityError
13
from .models import (Holder, Entity, Policy, Holding,
14
                     Commission, Provision, ProvisionLog, now)
15

    
16

    
17
class QuotaholderDjangoDBCallpoint(Callpoint):
18

    
19
    api_spec = QuotaholderAPI()
20

    
21
    http_exc_lookup = {
22
        CorruptedError:   550,
23
        InvalidDataError: 400,
24
        InvalidKeyError:  401,
25
        NoEntityError:    404,
26
        NoQuantityError:  413,
27
        NoCapacityError:  413,
28
    }
29

    
30
    def init_connection(self, connection):
31
        if connection is not None:
32
            raise ValueError("Cannot specify connection args with %s" %
33
                             type(self).__name__)
34
        pass
35

    
36
    def commit(self):
37
        transaction.commit()
38

    
39
    def rollback(self):
40
        transaction.rollback()
41

    
42
    def do_make_call(self, call_name, data):
43
        call_fn = getattr(self, call_name, None)
44
        if not call_fn:
45
            m = "cannot find call '%s'" % (call_name,)
46
            raise CorruptedError(m)
47

    
48
        return call_fn(**data)
49

    
50
    def create_entity(self, context={}, create_entity=()):
51
        rejected = []
52
        append = rejected.append
53

    
54
        for entity, owner, key, ownerkey in create_entity:
55
            try:
56
                owner = Entity.objects.get(entity=owner, key=ownerkey)
57
            except Entity.DoesNotExist:
58
                append(entity)
59
                continue
60

    
61
            try:
62
                e = Entity.objects.get(entity=entity, owner=owner)
63
                append(entity)
64
            except Entity.DoesNotExist:
65
                e = Entity.objects.create(entity=entity,
66
                                          owner=owner,
67
                                          key=key)
68
        return rejected
69

    
70
    def set_entity_key(self, context={}, set_entity_key=()):
71
        rejected = []
72
        append = rejected.append
73

    
74
        for entity, key, newkey in set_entity_key:
75
            try:
76
                e = Entity.objects.get(entity=entity, key=key)
77
            except Entity.DoesNotExist:
78
                append(entity)
79

    
80
            e.key = newkey
81
            e.save()
82

    
83
        return rejected
84

    
85
    def list_entities(self, context={}, entity=None, key=None):
86
        try:
87
            e = Entity.objects.get(entity=entity, key=key)
88
        except Entity.DoesNotExist:
89
            m = "Entity '%s' does not exist" % (entity,)
90
            raise NoEntityError(m)
91

    
92
        children = e.entities.all()
93
        entities = [e.entity for e in children]
94
        return entities
95

    
96
    def get_entity(self, context={}, get_entity=()):
97
        entities = []
98
        append = entities.append
99

    
100
        for entity, key in get_entity:
101
            try:
102
                Entity.objects.get(entity=entity, key=key)
103
            except Entity.DoesNotExist:
104
                continue
105

    
106
            append((entity, key))
107

    
108
        return entities
109

    
110
    def get_limits(self, context={}, get_limits=()):
111
        limits = []
112
        append = limits.append
113

    
114
        for entity, resource, key in get_limits:
115
            try:
116
                h = Holding.objects.get(entity=entity, resource=resource)
117
            except Policy.DoesNotExist:
118
                continue
119

    
120
            if h.entity.key != key:
121
                continue
122
            p = h.policy
123
            append((h.entity, h.resource, p.quantity, p.capacity,
124
                    p.import_limit, p.export_limit, h.flags))
125

    
126
        return limits
127

    
128
    def set_limits(self, context={}, set_limits=()):
129

    
130
        for (   policy, quantity, capacity,
131
                import_limit, export_limit  ) in set_limits:
132

    
133
                #XXX: create or replace?
134
                Policy.objects.create(  policy=policy,
135
                                        quantity=quantity,
136
                                        capacity=capacity,
137
                                        import_limit=import_limit,
138
                                        export_limit=export_limit   )
139

    
140
        return ()
141

    
142
    def get_holding(self, context={}, get_holding=()):
143
        holdings = []
144
        append = holdings.append
145

    
146
        for entity, resource, key in get_holding:
147
            try:
148
                h = Holding.objects.get(entity=entity, resource=resource)
149
            except Holding.DoesNotExist:
150
                continue
151

    
152
            if h.entity.key != key:
153
                continue
154

    
155
            append((h.entity.entity, h.resource, h.policy,
156
                    h.imported, h.exported, h.flags))
157

    
158
        return holdings
159

    
160
    def set_holding(self, context={}, set_holding=()):
161
        rejected = []
162
        append = rejected.append
163

    
164
        for entity, resource, key, policy, flags in set_holding:
165
            try:
166
                e = Entity.objects.get(entity=entity, key=key)
167
            except Entity.DoesNotExist:
168
                append((entity, resource, policy))
169
                continue
170

    
171
            if e.key != key:
172
                append((entity, resource, policy))
173
                continue
174

    
175
            try:
176
                p = Policy.objects.get(policy=policy)
177
            except Policy.DoesNotExist:
178
                append((entity, resource, policy))
179
                continue
180

    
181
            try:
182
                h = Holding.objects.get(entity=entity, resource=resource)
183
                h.policy = p
184
                h.flags = flags
185
                h.save()
186
            except Holding.DoesNotExist:
187
                h = Holding.objects.create( entity=entity, resource=resource,
188
                                            policy=policy, flags=flags      )
189

    
190
        return rejected
191

    
192
    def list_resources(self, context={}, entity=None, key=None):
193
        try:
194
            e = Entity.objects.get()
195
        except Entity.DoesNotExist:
196
            m = "No such entity '%s'" % (entity,)
197
            raise NoEntityError(m)
198

    
199
        if e.key != key:
200
            m = "Invalid key for entity '%s'" % (entity,)
201
            raise InvalidKeyError(m)
202

    
203
        holdings = e.holding_set.filter(entity=entity)
204
        resources = [h.resource for h in holdings]
205
        return resources
206

    
207
    def get_quota(self, context={}, get_quota=()):
208
        quotas = []
209
        append = quotas.append
210

    
211
        for entity, resource, key in get_quota:
212
            try:
213
                h = Holding.objects.get(entity=entity, resource=resource)
214
            except Holding.DoesNotExist:
215
                continue
216

    
217
            if h.entity.key != key:
218
                continue
219

    
220
            p = h.policy
221

    
222
            append((h.entity.entity, h.resource, p.quantity, p.capacity,
223
                    p.import_limit, p.export_limit,
224
                    h.imported, h.exported, h.flags))
225

    
226
        return quotas
227

    
228
    def set_quota(self, context={}, set_quota=()):
229
        rejected = []
230
        append = rejected.append
231

    
232
        for (   entity, resource, key,
233
                quantity, capacity,
234
                import_limit, export_limit, flags  ) in set_quota:
235

    
236
                p = None
237

    
238
                try:
239
                    h = Holding.objects.get(entity=entity, resource=resource)
240
                    if h.entity.key != key:
241
                        append((entity, resource))
242
                        continue
243
                    p = h.policy
244

    
245
                except Holding.DoesNotExist:
246
                    try:
247
                        e = Entity.objects.get(entity=entity)
248
                    except Entity.DoesNotExist:
249
                        append((entity, resource))
250
                        continue
251

    
252
                    if e.key != key:
253
                        append((entity, resource))
254
                        continue
255

    
256
                    h = None
257

    
258
                policy = newname('policy_')
259
                newp = Policy   (
260
                            policy=policy,
261
                            quantity=quantity,
262
                            capacity=capacity,
263
                            import_limit=import_limit,
264
                            export_limit=export_limit
265
                )
266

    
267
                if h is None:
268
                    h = Holding(entity=e, resource=resource,
269
                                policy=newp, flags=flags)
270
                else:
271
                    h.policy = newp
272
                    h.flags = flags
273

    
274
                h.save()
275
                newp.save()
276

    
277
                if p is not None and p.holding_set.count() == 0:
278
                    p.delete()
279

    
280
        return rejected
281

    
282
    def issue_commission(self,  context     =   {},
283
                                clientkey   =   None,
284
                                target      =   None,
285
                                key         =   None,
286
                                owner       =   None,
287
                                ownerkey    =   None,
288
                                provisions  =   ()  ):
289

    
290
        try:
291
            t = Entity.objects.get(entity=target)
292
        except Entity.DoesNotExist:
293
            create_entity = ((target, owner, key, ownerkey),)
294
            rejected = self.create_entity(context       =   context,
295
                                          create_entity =   create_entity)
296
            if rejected:
297
                raise NoEntityError("No target entity '%s'" % (target,))
298

    
299
            t = Entity.objects.get(entity=target)
300
        else:
301
            if t.key != key:
302
                m = "Invalid key for target entity '%s'" % (target,)
303
                raise InvalidKeyError(m)
304

    
305
        create = Commission.objects.create
306
        commission = create(entity_id=target, clientkey=clientkey)
307
        serial = commission.serial
308

    
309
        for entity, resource, quantity in provisions:
310
            release = 0
311
            if quantity < 0:
312
                release = 1
313

    
314
            try:
315
                h = Holding.objects.get(entity=entity, resource=resource)
316
            except Holding.DoesNotExist:
317
                m = ("There is not enough quantity "
318
                     "to allocate from in %s.%s" % (entity, resource))
319
                raise NoQuantityError(m)
320

    
321
            hp = h.policy
322

    
323
            if (hp.export_limit is not None and
324
                h.exporting + quantity > hp.export_limit):
325
                    m = ("Export limit reached for %s.%s" % (entity, resource))
326
                    raise ExportLimitError(m)
327

    
328
            available = (+ hp.quantity + h.imported + h.regained
329
                         - h.exporting - h.releasing)
330

    
331
            if available - quantity < 0:
332
                m = ("There is not enough quantity "
333
                     "to allocate from in %s.%s" % (entity, resource))
334
                raise NoQuantityError(m)
335

    
336
            try:
337
                th = Holding.objects.get(entity=target, resource=resource)
338
            except Holding.DoesNotExist:
339
                m = ("There is not enough capacity "
340
                     "to allocate into in %s.%s" % (target, resource))
341
                raise NoCapacityError(m)
342

    
343
            tp = th.policy
344

    
345
            if (tp.import_limit is not None and
346
                th.importing + quantity > tp.import_limit):
347
                    m = ("Import limit reached for %s.%s" % (target, resource))
348
                    raise ImportLimitError(m)
349

    
350
            capacity = (+ tp.capacity + th.exported + th.released
351
                        - th.importing - th.regaining)
352

    
353
            if capacity - quantity < 0:
354
                    m = ("There is not enough capacity "
355
                         "to allocate into in %s.%s" % (target, resource))
356
                    raise NoCapacityError(m)
357

    
358
            Provision.objects.create(   serial=serial,
359
                                        entity=entity,
360
                                        resource=resource,
361
                                        quantity=quantity   )
362

    
363
            if release:
364
                h.regaining -= quantity
365
                th.releasing -= quantity
366
            else:
367
                h.exporting += quantity
368
                th.importing += quantity
369

    
370
            h.save()
371
            th.save()
372

    
373
        return serial
374

    
375
    def _log_provision(self, serial, s_holding, t_holding,
376
                             provision, log_time, reason):
377

    
378
        source_allocated = s_holding.exported - s_holding.regained
379
        source_available = (+ s_holding.policy.quantity + s_holding.imported
380
                            - s_holding.released - source_allocated)
381
        target_allocated = t_holding.exported - t_holding.regained
382
        target_available = (+ t_holding.policy.quantity + t_holding.imported
383
                            - t_holding.released - target_allocated)
384

    
385
        ProvisionLog.objects.create(
386
                        serial              =   serial,
387
                        source              =   s_holding.entity.entity,
388
                        target              =   t_holding.entity.entity,
389
                        resource            =   provision.resource,
390
                        source_available    =   source_available,
391
                        source_allocated    =   source_allocated,
392
                        target_available    =   target_available,
393
                        target_allocated    =   target_allocated,
394
                        delta_quantity      =   provision.quantity,
395
                        issue_time          =   provision.issue_time,
396
                        log_time            =   log_time,
397
                        reason              =   reason)
398

    
399
    def accept_commission(self, context={}, clientkey=None,
400
                                serials=(), reason=''):
401
        for serial in serials:
402
            try:
403
                c = Commission.objects.get(clientkey=clientkey, serial=serial)
404
            except Commission.DoesNotExist:
405
                return
406

    
407
            t = c.entity
408
            log_time = now()
409

    
410
            provisions = Provision.objects.filter(serial=serial)
411
            for pv in provisions:
412
                try:
413
                    h = Holding.objects.get(entity=pv.entity.entity,
414
                                            resource=pv.resource    )
415
                    th = Holding.objects.get(entity=t, resource=pv.resource)
416
                except Holding.DoesNotExist:
417
                    m = "Corrupted provision"
418
                    raise CorruptedError(m)
419

    
420
                quantity = pv.quantity
421
                release = 0
422
                if quantity < 0:
423
                    release = 1
424

    
425
                if release:
426
                    h.regained -= quantity
427
                    th.released -= quantity
428
                else:
429
                    h.exported += quantity
430
                    th.imported += quantity
431

    
432
                reason = 'ACCEPT:' + reason[-121:]
433
                self._log_provision(serial, h, th, pv, log_time, reason)
434
                h.save()
435
                th.save()
436
                pv.delete()
437

    
438
        return
439

    
440
    def reject_commission(self, context={}, clientkey=None,
441
                                serials=(), reason=''):
442
        for serial in serials:
443
            try:
444
                c = Commission.objects.get(clientkey=clientkey, serial=serial)
445
            except Commission.DoesNotExist:
446
                return
447

    
448
            t = c.entity
449

    
450
            provisions = Provision.objects.filter(serial=serial)
451
            for pv in provisions:
452
                try:
453
                    h = Holding.objects.get(entity=pv.entity.entity,
454
                                            resource=pv.resource)
455
                    th = Holding.objects.get(entity=t, resource=pv.resource)
456
                except Holding.DoesNotExist:
457
                    m = "Corrupted provision"
458
                    raise CorruptedError(m)
459

    
460
                quantity = pv.quantity
461
                release = 0
462
                if quantity < 0:
463
                    release = 1
464

    
465
                if release:
466
                    h.regaining += quantity
467
                    th.releasing += quantity
468
                else:
469
                    h.exporting -= quantity
470
                    th.importing -= quantity
471

    
472
                source_allocated = h.exported - h.regained
473
                source_available = (+ h.policy.quantity + h.imported
474
                        - h.released - source_allocated)
475
                target_allocated = th.exported - th.regained
476
                target_available = (+ th.policy.quantity + th.imported
477
                        - th.released - target_allocated)
478

    
479
                reason = 'REJECT:' + reason[-121:]
480
                self._log_provision(serial, h, th, pv, log_time, reason)
481
                h.save()
482
                th.save()
483
                pv.delete()
484

    
485
        return
486

    
487
    def get_pending_commissions(self, context={}, clientkey=None):
488
        pending = Commission.objects.filter(clientkey=clientkey)
489
        return pending
490

    
491
    def resolve_pending_commissions(self,   context={}, clientkey=None,
492
                                            max_serial=None, accept_set=()  ):
493
        accept_set = set(accept_set)
494
        pending = self.get_pending_commissions(clientkey=clientkey)
495
        pending = sorted(pending)
496

    
497
        accept = self.accept_commission
498
        reject = self.reject_commission
499

    
500
        for serial in pending:
501
            if serial > max_serial:
502
                break
503

    
504
            if serial in accept_set:
505
                accept(clientkey=clientkey, serial=serial)
506
            else:
507
                reject(clientkey=clientkey, serial=serial)
508

    
509
        return
510

    
511
    def release_entity(self, context={}, release_entity=()):
512
        rejected = []
513
        append = rejected.append
514
        for entity, key in release_entity:
515
            try:
516
                e = Entity.objects.get(entity=entity, key=key)
517
            except Entity.DoesNotExist:
518
                append(entity)
519
                continue
520

    
521
            if e.entities.count() != 0:
522
                append(entity)
523
                continue
524

    
525
            if e.holdings.count() != 0:
526
                append(entity)
527
                continue
528

    
529
            e.delete()
530

    
531
        return rejected
532

    
533
    def get_timeline(self, context={}, after="", before="Z", entities=()):
534
        entity_set = set()
535
        add = entity_set.add
536

    
537
        for entity, key in entities:
538
            try:
539
                e = Entity.objects.get(entity=entity, key=key)
540
                add(entity)
541
            except Entity.DoesNotExist:
542
                continue
543

    
544
        chunk_size = 65536
545
        nr = 0
546
        timeline = []
547
        extend = timeline.extend
548
        filterlogs = ProvisionLog.objects.filter
549
        if entity_set:
550
            q_entity = Q(source__in = entity_set) | Q(target__in = entity_set)
551
        else:
552
            q_entity = Q()
553

    
554
        while 1:
555
            logs = filterlogs(  q_entity,
556
                                issue_time__gt      =   after,
557
                                issue_time__lte     =   before,
558
                                reason__startswith  =   'ACCEPT:'   )
559

    
560
            logs = logs.order_by('issue_time')
561
            logs = logs.values()
562
            logs = logs[:chunk_size]
563
            nr += len(logs)
564
            if not logs:
565
                break
566
            extend(logs)
567
            after = logs[-1]['issue_time']
568
            if after >= before:
569
                break
570

    
571
        return timeline
572

    
573

    
574
API_Callpoint = QuotaholderDjangoDBCallpoint
575