Revision 5d996aea

b/snf-quotaholder-app/quotaholder_django/quotaholder_app/callpoint.py
45 45
from django.db.models import Q
46 46
from django.db import transaction, IntegrityError
47 47
from .models import (Holder, Entity, Policy, Holding,
48
                     Commission, Provision, ProvisionLog, now)
48
                     Commission, Provision, ProvisionLog, now,
49
                     db_get_entity, db_get_holding, db_get_policy,
50
                     db_get_commission, db_filter_provision)
49 51

  
50 52

  
51 53
class QuotaholderDjangoDBCallpoint(Callpoint):
......
107 109

  
108 110
        for entity, key, newkey in set_entity_key:
109 111
            try:
110
                e = Entity.objects.get(entity=entity, key=key)
112
                e = db_get_entity(entity=entity, key=key, for_update=True)
111 113
            except Entity.DoesNotExist:
112 114
                append(entity)
113 115
                continue
......
163 165
                import_limit, export_limit  ) in set_limits:
164 166

  
165 167
                try:
166
                    policy = Policy.objects.get(policy=policy)
168
                    policy = db_get_policy(policy=policy, for_update=True)
167 169
                except Policy.DoesNotExist:
168 170
                    Policy.objects.create(  policy=policy,
169 171
                                            quantity=quantity,
......
200 202

  
201 203
    def _set_holding(self, entity, resource, policy, flags):
202 204
        try:
203
            h = Holding.objects.get(entity=entity, resource=resource)
205
            h = db_get_holding(entity=entity, resource=resource,
206
                               for_update=True)
204 207
            h.policy = p
205 208
            h.flags = flags
206 209
            h.save()
......
231 234
                continue
232 235

  
233 236
            try:
234
                h = Holding.objects.get(entity=entity, resource=resource)
237
                h = db_get_holding(entity=entity, resource=resource,
238
                                   for_update=True)
235 239
                h.policy = p
236 240
                h.flags = flags
237 241
                h.save()
......
245 249
                          imported, exported, returned, released,
246 250
                          flags):
247 251
        try:
248
            h = Holding.objects.get(entity=entity, resource=resource)
252
            h = db_get_holding(entity=entity, resource=resource,
253
                               for_update=True)
249 254
        except Holding.DoesNotExist:
250 255
            h = Holding(entity=entity, resource=resource)
251 256

  
......
305 310
                continue
306 311

  
307 312
            try:
308
                h = Holding.objects.get(entity=entity, resource=resource)
313
                h = db_get_holding(entity=entity, resource=resource,
314
                                   for_update=True)
309 315
                h.imported=imported
310 316
                h.importing=imported
311 317
                h.exported=exported
......
341 347

  
342 348
    def _increase_resource(self, entity, resource, amount):
343 349
        try:
344
            h = Holding.objects.get(entity=entity, resource=resource)
350
            h = db_get_holding(entity=entity, resource=resource,
351
                               for_update=True)
345 352
        except Holding.DoesNotExist:
346 353
            h = Holding(entity=entity, resource=resource)
347 354
            p = Policy.objects.create(policy=self._new_policy_name(),
......
356 363

  
357 364
        for idx, (entity, resource, key) in enumerate(release_holding):
358 365
            try:
359
                h = Holding.objects.get(entity=entity, resource=resource)
366
                h = db_get_holding(entity=entity, resource=resource,
367
                                   for_update=True)
360 368
            except Holding.DoesNotExist:
361 369
                append(idx)
362 370
                continue
......
462 470
                )
463 471

  
464 472
                try:
465
                    h = Holding.objects.get(entity=entity, resource=resource)
473
                    h = db_get_holding(entity=entity, resource=resource,
474
                                       for_update=True)
466 475
                    p = h.policy
467 476
                    h.policy = newp
468 477
                    h.flags = flags
......
528 537
                release = 1
529 538

  
530 539
            try:
531
                h = Holding.objects.get(entity=entity, resource=resource)
540
                h = db_get_holding(entity=entity, resource=resource,
541
                                   for_update=True)
532 542
            except Holding.DoesNotExist:
533 543
                m = ("There is not enough quantity "
534 544
                     "to allocate from in %s.%s" % (entity, resource))
......
551 561
                    raise NoQuantityError(m)
552 562

  
553 563
            try:
554
                th = Holding.objects.get(entity=target, resource=resource)
564
                th = db_get_holding(entity=target, resource=resource,
565
                                    for_update=True)
555 566
            except Holding.DoesNotExist:
556 567
                m = ("There is not enough capacity "
557 568
                     "to allocate into in %s.%s" % (target, resource))
......
630 641

  
631 642
        for serial in serials:
632 643
            try:
633
                c = Commission.objects.get(clientkey=clientkey, serial=serial)
644
                c = db_get_commission(clientkey=clientkey, serial=serial,
645
                                      for_update=True)
634 646
            except Commission.DoesNotExist:
635 647
                return
636 648

  
637 649
            t = c.entity
638 650

  
639
            provisions = Provision.objects.filter(serial=serial)
651
            provisions = db_filter_provision(serial=serial, for_update=True)
640 652
            for pv in provisions:
641 653
                try:
642
                    h = Holding.objects.get(entity=pv.entity.entity,
643
                                            resource=pv.resource    )
644
                    th = Holding.objects.get(entity=t, resource=pv.resource)
654
                    h = db_get_holding(entity=pv.entity.entity,
655
                                       resource=pv.resource, for_update=True)
656
                    th = db_get_holding(entity=t, resource=pv.resource,
657
                                        for_update=True)
645 658
                except Holding.DoesNotExist:
646 659
                    m = "Corrupted provision"
647 660
                    raise CorruptedError(m)
......
673 686

  
674 687
        for serial in serials:
675 688
            try:
676
                c = Commission.objects.get(clientkey=clientkey, serial=serial)
689
                c = db_get_commission(clientkey=clientkey, serial=serial,
690
                                      for_update=True)
677 691
            except Commission.DoesNotExist:
678 692
                return
679 693

  
680 694
            t = c.entity
681 695

  
682
            provisions = Provision.objects.filter(serial=serial)
696
            provisions = db_filter_provision(serial=serial, for_update=True)
683 697
            for pv in provisions:
684 698
                try:
685
                    h = Holding.objects.get(entity=pv.entity.entity,
686
                                            resource=pv.resource)
687
                    th = Holding.objects.get(entity=t, resource=pv.resource)
699
                    h = db_get_holding(entity=pv.entity.entity,
700
                                       resource=pv.resource, for_update=True)
701
                    th = db_get_holding(entity=t, resource=pv.resource,
702
                                        for_update=True)
688 703
                except Holding.DoesNotExist:
689 704
                    m = "Corrupted provision"
690 705
                    raise CorruptedError(m)
......
740 755
        append = rejected.append
741 756
        for entity, key in release_entity:
742 757
            try:
743
                e = Entity.objects.get(entity=entity, key=key)
758
                e = db_get_entity(entity=entity, key=key, for_update=True)
744 759
            except Entity.DoesNotExist:
745 760
                append(entity)
746 761
                continue
b/snf-quotaholder-app/quotaholder_django/quotaholder_app/managers.py
1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

  
30
from django.db import connections
31
from django.db.models import Manager
32
from django.db.models.query import QuerySet
33

  
34

  
35
class ForUpdateManager(Manager):
36
    """ Model manager implementing SELECT .. FOR UPDATE statement
37

  
38
        This manager implements select_for_update() method in order to use
39
        row-level locking in the database and guarantee exclusive access, since
40
        this method is only implemented in Django>=1.4.
41

  
42
        Non-blocking reads are not implemented, and each query including a row
43
        that is locked by another transaction will block until the lock is
44
        released. Also care must be taken in order to avoid deadlocks or retry
45
        transactions that abort due to deadlocks.
46

  
47
        Example:
48
            networks = Network.objects.select_for_update().filter(public=True)
49

  
50
    """
51

  
52
    def __init__(self, *args, **kwargs):
53
        super(ForUpdateManager, self).__init__(*args, **kwargs)
54
        self._select_for_update = False
55

  
56
    def filter(self, *args, **kwargs):
57
        query = self.get_query_set().filter(*args, **kwargs)
58
        if self._select_for_update:
59
            self._select_for_update = False
60
            return for_update(query)
61
        else:
62
            return query
63

  
64
    def get(self, *args, **kwargs):
65
        if not self._select_for_update:
66
            return self.get_query_set().get(*args, **kwargs)
67

  
68
        query = self.filter(*args, **kwargs)
69
        query = list(query)
70
        num = len(query)
71
        if num == 1:
72
            return query[0]
73
        if not num:
74
            raise self.model.DoesNotExist(
75
                    "%s matching query does not exist. "
76
                    "Lookup parameters were %s" %
77
                    (self.model._meta.object_name, kwargs))
78
        raise self.model.MultipleObjectsReturned(
79
            "get() returned more than one %s -- it returned %s! "
80
            "Lookup parameters were %s" %
81
            (self.model._meta.object_name, num, kwargs))
82

  
83
    def select_for_update(self, *args, **kwargs):
84
        self._select_for_update = True
85
        return self
86

  
87

  
88
def for_update(query):
89
    """ Rewrite query using SELECT .. FOR UPDATE.
90

  
91
    """
92
    if 'sqlite' in connections[query.db].settings_dict['ENGINE'].lower():
93
        # SQLite  does not support FOR UPDATE
94
        return query
95
    sql, params = query.query.get_compiler(query.db).as_sql()
96
    return query.model._default_manager.raw(sql.rstrip() + ' FOR UPDATE',
97
                                            params)
98

  
99

  
100
class ProtectedDeleteManager(ForUpdateManager):
101
    """ Manager for protecting Backend deletion.
102

  
103
        Call Backend delete() method in order to prevent deletion
104
        of Backends that host non-deleted VirtualMachines.
105

  
106
    """
107

  
108
    def get_query_set(self):
109
        return BackendQuerySet(self.model, using=self._db)
110

  
111

  
112
class BackendQuerySet(QuerySet):
113
    def delete(self):
114
        for backend in self._clone():
115
            backend.delete()
b/snf-quotaholder-app/quotaholder_django/quotaholder_app/models.py
37 37
from django.db.models import (Model, BigIntegerField, CharField,
38 38
                              ForeignKey, AutoField)
39 39
from django.db import transaction
40

  
40
from .managers import ForUpdateManager
41 41

  
42 42
class Holder(Model):
43 43

  
......
45 45
    intval      =   BigIntegerField()
46 46
    strval      =   CharField(max_length=4096)
47 47

  
48
    objects     =   ForUpdateManager()
48 49

  
49 50
class Entity(Model):
50 51

  
......
53 54
                               related_name='entities')
54 55
    key         =   CharField(max_length=4096, null=False)
55 56

  
57
    objects     =   ForUpdateManager()
56 58

  
57 59
class Policy(Model):
58 60

  
......
62 64
    import_limit    =   BigIntegerField(null=True,  default=None)
63 65
    export_limit    =   BigIntegerField(null=True,  default=None)
64 66

  
67
    objects     =   ForUpdateManager()
65 68

  
66 69
class Holding(Model):
67 70

  
......
80 83
    released    =   BigIntegerField(null=False, default=0)
81 84
    releasing   =   BigIntegerField(null=False, default=0)
82 85

  
86
    objects     =   ForUpdateManager()
87

  
83 88
    class Meta:
84 89
        unique_together = (('entity', 'resource'),)
85 90

  
......
98 103
    clientkey   =   CharField(max_length=4096, null=False)
99 104
    issue_time  =   CharField(max_length=24, default=now)
100 105

  
106
    objects     =   ForUpdateManager()
101 107

  
102 108
class Provision(Model):
103 109

  
......
109 115
    resource    =   CharField(max_length=4096, null=False)
110 116
    quantity    =   BigIntegerField(null=False)
111 117

  
118
    objects     =   ForUpdateManager()
112 119

  
113 120
class ProvisionLog(Model):
114 121

  
......
138 145
    delta_quantity      =   BigIntegerField(null=False)
139 146
    reason              =   CharField(max_length=4096)
140 147

  
148
    objects     =   ForUpdateManager()
141 149

  
142 150
    def source_allocated_through(self):
143 151
        return self.source_imported - self.source_released
......
179 187
    def target_outbound(self):
180 188
        return self.target_outbound_through() + self.target_exported
181 189

  
190
def _access(*args, **kwargs):
191
    method = args[0]
192
    model = args[1]
193
    args = args[2:]
194
    o = model.objects
195
    try:
196
        if kwargs['for_update']:
197
            del kwargs['for_update']
198
            o = o.select_for_update()
199
    except KeyError:
200
        pass
201
    f = getattr(o, method)
202
    return f(*args, **kwargs)
203

  
204
def _get(*args, **kwargs):
205
    return _access('get', *args, **kwargs)
206

  
207
def _filter(*args, **kwargs):
208
    return _access('filter', *args, **kwargs)
209

  
210
def db_get_holding(*args, **kwargs):
211
    return _get(Holding, *args, **kwargs)
212

  
213
def db_get_entity(*args, **kwargs):
214
    return _get(Entity, *args, **kwargs)
215

  
216
def db_get_policy(*args, **kwargs):
217
    return _get(Policy, *args, **kwargs)
218

  
219
def db_get_commission(*args, **kwargs):
220
    return _get(Commission, *args, **kwargs)
221

  
222
def db_filter_provision(*args, **kwargs):
223
    return _filter(Provision, *args, **kwargs)

Also available in: Unified diff