Revision 5d996aea
b/snf-quotaholder-app/quotaholder_django/quotaholder_app/callpoint.py | ||
---|---|---|
45 | 45 |
from django.db.models import Q |
46 | 46 |
from django.db import transaction, IntegrityError |
47 | 47 |
from .models import (Holder, Entity, Policy, Holding, |
48 |
Commission, Provision, ProvisionLog, now) |
|
48 |
Commission, Provision, ProvisionLog, now, |
|
49 |
db_get_entity, db_get_holding, db_get_policy, |
|
50 |
db_get_commission, db_filter_provision) |
|
49 | 51 |
|
50 | 52 |
|
51 | 53 |
class QuotaholderDjangoDBCallpoint(Callpoint): |
... | ... | |
107 | 109 |
|
108 | 110 |
for entity, key, newkey in set_entity_key: |
109 | 111 |
try: |
110 |
e = Entity.objects.get(entity=entity, key=key)
|
|
112 |
e = db_get_entity(entity=entity, key=key, for_update=True)
|
|
111 | 113 |
except Entity.DoesNotExist: |
112 | 114 |
append(entity) |
113 | 115 |
continue |
... | ... | |
163 | 165 |
import_limit, export_limit ) in set_limits: |
164 | 166 |
|
165 | 167 |
try: |
166 |
policy = Policy.objects.get(policy=policy)
|
|
168 |
policy = db_get_policy(policy=policy, for_update=True)
|
|
167 | 169 |
except Policy.DoesNotExist: |
168 | 170 |
Policy.objects.create( policy=policy, |
169 | 171 |
quantity=quantity, |
... | ... | |
200 | 202 |
|
201 | 203 |
def _set_holding(self, entity, resource, policy, flags): |
202 | 204 |
try: |
203 |
h = Holding.objects.get(entity=entity, resource=resource) |
|
205 |
h = db_get_holding(entity=entity, resource=resource, |
|
206 |
for_update=True) |
|
204 | 207 |
h.policy = p |
205 | 208 |
h.flags = flags |
206 | 209 |
h.save() |
... | ... | |
231 | 234 |
continue |
232 | 235 |
|
233 | 236 |
try: |
234 |
h = Holding.objects.get(entity=entity, resource=resource) |
|
237 |
h = db_get_holding(entity=entity, resource=resource, |
|
238 |
for_update=True) |
|
235 | 239 |
h.policy = p |
236 | 240 |
h.flags = flags |
237 | 241 |
h.save() |
... | ... | |
245 | 249 |
imported, exported, returned, released, |
246 | 250 |
flags): |
247 | 251 |
try: |
248 |
h = Holding.objects.get(entity=entity, resource=resource) |
|
252 |
h = db_get_holding(entity=entity, resource=resource, |
|
253 |
for_update=True) |
|
249 | 254 |
except Holding.DoesNotExist: |
250 | 255 |
h = Holding(entity=entity, resource=resource) |
251 | 256 |
|
... | ... | |
305 | 310 |
continue |
306 | 311 |
|
307 | 312 |
try: |
308 |
h = Holding.objects.get(entity=entity, resource=resource) |
|
313 |
h = db_get_holding(entity=entity, resource=resource, |
|
314 |
for_update=True) |
|
309 | 315 |
h.imported=imported |
310 | 316 |
h.importing=imported |
311 | 317 |
h.exported=exported |
... | ... | |
341 | 347 |
|
342 | 348 |
def _increase_resource(self, entity, resource, amount): |
343 | 349 |
try: |
344 |
h = Holding.objects.get(entity=entity, resource=resource) |
|
350 |
h = db_get_holding(entity=entity, resource=resource, |
|
351 |
for_update=True) |
|
345 | 352 |
except Holding.DoesNotExist: |
346 | 353 |
h = Holding(entity=entity, resource=resource) |
347 | 354 |
p = Policy.objects.create(policy=self._new_policy_name(), |
... | ... | |
356 | 363 |
|
357 | 364 |
for idx, (entity, resource, key) in enumerate(release_holding): |
358 | 365 |
try: |
359 |
h = Holding.objects.get(entity=entity, resource=resource) |
|
366 |
h = db_get_holding(entity=entity, resource=resource, |
|
367 |
for_update=True) |
|
360 | 368 |
except Holding.DoesNotExist: |
361 | 369 |
append(idx) |
362 | 370 |
continue |
... | ... | |
462 | 470 |
) |
463 | 471 |
|
464 | 472 |
try: |
465 |
h = Holding.objects.get(entity=entity, resource=resource) |
|
473 |
h = db_get_holding(entity=entity, resource=resource, |
|
474 |
for_update=True) |
|
466 | 475 |
p = h.policy |
467 | 476 |
h.policy = newp |
468 | 477 |
h.flags = flags |
... | ... | |
528 | 537 |
release = 1 |
529 | 538 |
|
530 | 539 |
try: |
531 |
h = Holding.objects.get(entity=entity, resource=resource) |
|
540 |
h = db_get_holding(entity=entity, resource=resource, |
|
541 |
for_update=True) |
|
532 | 542 |
except Holding.DoesNotExist: |
533 | 543 |
m = ("There is not enough quantity " |
534 | 544 |
"to allocate from in %s.%s" % (entity, resource)) |
... | ... | |
551 | 561 |
raise NoQuantityError(m) |
552 | 562 |
|
553 | 563 |
try: |
554 |
th = Holding.objects.get(entity=target, resource=resource) |
|
564 |
th = db_get_holding(entity=target, resource=resource, |
|
565 |
for_update=True) |
|
555 | 566 |
except Holding.DoesNotExist: |
556 | 567 |
m = ("There is not enough capacity " |
557 | 568 |
"to allocate into in %s.%s" % (target, resource)) |
... | ... | |
630 | 641 |
|
631 | 642 |
for serial in serials: |
632 | 643 |
try: |
633 |
c = Commission.objects.get(clientkey=clientkey, serial=serial) |
|
644 |
c = db_get_commission(clientkey=clientkey, serial=serial, |
|
645 |
for_update=True) |
|
634 | 646 |
except Commission.DoesNotExist: |
635 | 647 |
return |
636 | 648 |
|
637 | 649 |
t = c.entity |
638 | 650 |
|
639 |
provisions = Provision.objects.filter(serial=serial)
|
|
651 |
provisions = db_filter_provision(serial=serial, for_update=True)
|
|
640 | 652 |
for pv in provisions: |
641 | 653 |
try: |
642 |
h = Holding.objects.get(entity=pv.entity.entity, |
|
643 |
resource=pv.resource ) |
|
644 |
th = Holding.objects.get(entity=t, resource=pv.resource) |
|
654 |
h = db_get_holding(entity=pv.entity.entity, |
|
655 |
resource=pv.resource, for_update=True) |
|
656 |
th = db_get_holding(entity=t, resource=pv.resource, |
|
657 |
for_update=True) |
|
645 | 658 |
except Holding.DoesNotExist: |
646 | 659 |
m = "Corrupted provision" |
647 | 660 |
raise CorruptedError(m) |
... | ... | |
673 | 686 |
|
674 | 687 |
for serial in serials: |
675 | 688 |
try: |
676 |
c = Commission.objects.get(clientkey=clientkey, serial=serial) |
|
689 |
c = db_get_commission(clientkey=clientkey, serial=serial, |
|
690 |
for_update=True) |
|
677 | 691 |
except Commission.DoesNotExist: |
678 | 692 |
return |
679 | 693 |
|
680 | 694 |
t = c.entity |
681 | 695 |
|
682 |
provisions = Provision.objects.filter(serial=serial)
|
|
696 |
provisions = db_filter_provision(serial=serial, for_update=True)
|
|
683 | 697 |
for pv in provisions: |
684 | 698 |
try: |
685 |
h = Holding.objects.get(entity=pv.entity.entity, |
|
686 |
resource=pv.resource) |
|
687 |
th = Holding.objects.get(entity=t, resource=pv.resource) |
|
699 |
h = db_get_holding(entity=pv.entity.entity, |
|
700 |
resource=pv.resource, for_update=True) |
|
701 |
th = db_get_holding(entity=t, resource=pv.resource, |
|
702 |
for_update=True) |
|
688 | 703 |
except Holding.DoesNotExist: |
689 | 704 |
m = "Corrupted provision" |
690 | 705 |
raise CorruptedError(m) |
... | ... | |
740 | 755 |
append = rejected.append |
741 | 756 |
for entity, key in release_entity: |
742 | 757 |
try: |
743 |
e = Entity.objects.get(entity=entity, key=key)
|
|
758 |
e = db_get_entity(entity=entity, key=key, for_update=True)
|
|
744 | 759 |
except Entity.DoesNotExist: |
745 | 760 |
append(entity) |
746 | 761 |
continue |
b/snf-quotaholder-app/quotaholder_django/quotaholder_app/managers.py | ||
---|---|---|
1 |
# Copyright 2012 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or without |
|
4 |
# modification, are permitted provided that the following conditions |
|
5 |
# are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above copyright |
|
8 |
# notice, this list of conditions and the following disclaimer. |
|
9 |
# |
|
10 |
# 2. Redistributions in binary form must reproduce the above copyright |
|
11 |
# notice, this list of conditions and the following disclaimer in the |
|
12 |
# documentation and/or other materials provided with the distribution. |
|
13 |
# |
|
14 |
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND |
|
15 |
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
|
16 |
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
|
17 |
# ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE |
|
18 |
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL |
|
19 |
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS |
|
20 |
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) |
|
21 |
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
22 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY |
|
23 |
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF |
|
24 |
# SUCH DAMAGE. |
|
25 |
# |
|
26 |
# The views and conclusions contained in the software and documentation are |
|
27 |
# those of the authors and should not be interpreted as representing official |
|
28 |
# policies, either expressed or implied, of GRNET S.A. |
|
29 |
|
|
30 |
from django.db import connections |
|
31 |
from django.db.models import Manager |
|
32 |
from django.db.models.query import QuerySet |
|
33 |
|
|
34 |
|
|
35 |
class ForUpdateManager(Manager): |
|
36 |
""" Model manager implementing SELECT .. FOR UPDATE statement |
|
37 |
|
|
38 |
This manager implements select_for_update() method in order to use |
|
39 |
row-level locking in the database and guarantee exclusive access, since |
|
40 |
this method is only implemented in Django>=1.4. |
|
41 |
|
|
42 |
Non-blocking reads are not implemented, and each query including a row |
|
43 |
that is locked by another transaction will block until the lock is |
|
44 |
released. Also care must be taken in order to avoid deadlocks or retry |
|
45 |
transactions that abort due to deadlocks. |
|
46 |
|
|
47 |
Example: |
|
48 |
networks = Network.objects.select_for_update().filter(public=True) |
|
49 |
|
|
50 |
""" |
|
51 |
|
|
52 |
def __init__(self, *args, **kwargs): |
|
53 |
super(ForUpdateManager, self).__init__(*args, **kwargs) |
|
54 |
self._select_for_update = False |
|
55 |
|
|
56 |
def filter(self, *args, **kwargs): |
|
57 |
query = self.get_query_set().filter(*args, **kwargs) |
|
58 |
if self._select_for_update: |
|
59 |
self._select_for_update = False |
|
60 |
return for_update(query) |
|
61 |
else: |
|
62 |
return query |
|
63 |
|
|
64 |
def get(self, *args, **kwargs): |
|
65 |
if not self._select_for_update: |
|
66 |
return self.get_query_set().get(*args, **kwargs) |
|
67 |
|
|
68 |
query = self.filter(*args, **kwargs) |
|
69 |
query = list(query) |
|
70 |
num = len(query) |
|
71 |
if num == 1: |
|
72 |
return query[0] |
|
73 |
if not num: |
|
74 |
raise self.model.DoesNotExist( |
|
75 |
"%s matching query does not exist. " |
|
76 |
"Lookup parameters were %s" % |
|
77 |
(self.model._meta.object_name, kwargs)) |
|
78 |
raise self.model.MultipleObjectsReturned( |
|
79 |
"get() returned more than one %s -- it returned %s! " |
|
80 |
"Lookup parameters were %s" % |
|
81 |
(self.model._meta.object_name, num, kwargs)) |
|
82 |
|
|
83 |
def select_for_update(self, *args, **kwargs): |
|
84 |
self._select_for_update = True |
|
85 |
return self |
|
86 |
|
|
87 |
|
|
88 |
def for_update(query): |
|
89 |
""" Rewrite query using SELECT .. FOR UPDATE. |
|
90 |
|
|
91 |
""" |
|
92 |
if 'sqlite' in connections[query.db].settings_dict['ENGINE'].lower(): |
|
93 |
# SQLite does not support FOR UPDATE |
|
94 |
return query |
|
95 |
sql, params = query.query.get_compiler(query.db).as_sql() |
|
96 |
return query.model._default_manager.raw(sql.rstrip() + ' FOR UPDATE', |
|
97 |
params) |
|
98 |
|
|
99 |
|
|
100 |
class ProtectedDeleteManager(ForUpdateManager): |
|
101 |
""" Manager for protecting Backend deletion. |
|
102 |
|
|
103 |
Call Backend delete() method in order to prevent deletion |
|
104 |
of Backends that host non-deleted VirtualMachines. |
|
105 |
|
|
106 |
""" |
|
107 |
|
|
108 |
def get_query_set(self): |
|
109 |
return BackendQuerySet(self.model, using=self._db) |
|
110 |
|
|
111 |
|
|
112 |
class BackendQuerySet(QuerySet): |
|
113 |
def delete(self): |
|
114 |
for backend in self._clone(): |
|
115 |
backend.delete() |
b/snf-quotaholder-app/quotaholder_django/quotaholder_app/models.py | ||
---|---|---|
37 | 37 |
from django.db.models import (Model, BigIntegerField, CharField, |
38 | 38 |
ForeignKey, AutoField) |
39 | 39 |
from django.db import transaction |
40 |
|
|
40 |
from .managers import ForUpdateManager |
|
41 | 41 |
|
42 | 42 |
class Holder(Model): |
43 | 43 |
|
... | ... | |
45 | 45 |
intval = BigIntegerField() |
46 | 46 |
strval = CharField(max_length=4096) |
47 | 47 |
|
48 |
objects = ForUpdateManager() |
|
48 | 49 |
|
49 | 50 |
class Entity(Model): |
50 | 51 |
|
... | ... | |
53 | 54 |
related_name='entities') |
54 | 55 |
key = CharField(max_length=4096, null=False) |
55 | 56 |
|
57 |
objects = ForUpdateManager() |
|
56 | 58 |
|
57 | 59 |
class Policy(Model): |
58 | 60 |
|
... | ... | |
62 | 64 |
import_limit = BigIntegerField(null=True, default=None) |
63 | 65 |
export_limit = BigIntegerField(null=True, default=None) |
64 | 66 |
|
67 |
objects = ForUpdateManager() |
|
65 | 68 |
|
66 | 69 |
class Holding(Model): |
67 | 70 |
|
... | ... | |
80 | 83 |
released = BigIntegerField(null=False, default=0) |
81 | 84 |
releasing = BigIntegerField(null=False, default=0) |
82 | 85 |
|
86 |
objects = ForUpdateManager() |
|
87 |
|
|
83 | 88 |
class Meta: |
84 | 89 |
unique_together = (('entity', 'resource'),) |
85 | 90 |
|
... | ... | |
98 | 103 |
clientkey = CharField(max_length=4096, null=False) |
99 | 104 |
issue_time = CharField(max_length=24, default=now) |
100 | 105 |
|
106 |
objects = ForUpdateManager() |
|
101 | 107 |
|
102 | 108 |
class Provision(Model): |
103 | 109 |
|
... | ... | |
109 | 115 |
resource = CharField(max_length=4096, null=False) |
110 | 116 |
quantity = BigIntegerField(null=False) |
111 | 117 |
|
118 |
objects = ForUpdateManager() |
|
112 | 119 |
|
113 | 120 |
class ProvisionLog(Model): |
114 | 121 |
|
... | ... | |
138 | 145 |
delta_quantity = BigIntegerField(null=False) |
139 | 146 |
reason = CharField(max_length=4096) |
140 | 147 |
|
148 |
objects = ForUpdateManager() |
|
141 | 149 |
|
142 | 150 |
def source_allocated_through(self): |
143 | 151 |
return self.source_imported - self.source_released |
... | ... | |
179 | 187 |
def target_outbound(self): |
180 | 188 |
return self.target_outbound_through() + self.target_exported |
181 | 189 |
|
190 |
def _access(*args, **kwargs): |
|
191 |
method = args[0] |
|
192 |
model = args[1] |
|
193 |
args = args[2:] |
|
194 |
o = model.objects |
|
195 |
try: |
|
196 |
if kwargs['for_update']: |
|
197 |
del kwargs['for_update'] |
|
198 |
o = o.select_for_update() |
|
199 |
except KeyError: |
|
200 |
pass |
|
201 |
f = getattr(o, method) |
|
202 |
return f(*args, **kwargs) |
|
203 |
|
|
204 |
def _get(*args, **kwargs): |
|
205 |
return _access('get', *args, **kwargs) |
|
206 |
|
|
207 |
def _filter(*args, **kwargs): |
|
208 |
return _access('filter', *args, **kwargs) |
|
209 |
|
|
210 |
def db_get_holding(*args, **kwargs): |
|
211 |
return _get(Holding, *args, **kwargs) |
|
212 |
|
|
213 |
def db_get_entity(*args, **kwargs): |
|
214 |
return _get(Entity, *args, **kwargs) |
|
215 |
|
|
216 |
def db_get_policy(*args, **kwargs): |
|
217 |
return _get(Policy, *args, **kwargs) |
|
218 |
|
|
219 |
def db_get_commission(*args, **kwargs): |
|
220 |
return _get(Commission, *args, **kwargs) |
|
221 |
|
|
222 |
def db_filter_provision(*args, **kwargs): |
|
223 |
return _filter(Provision, *args, **kwargs) |
Also available in: Unified diff