Revision ee45eb81
b/snf-astakos-app/astakos/im/endpoints/qh.py | ||
---|---|---|
53 | 53 |
|
54 | 54 |
inf = float('inf') |
55 | 55 |
|
56 |
clientkey = 'astakos' |
|
57 |
|
|
56 | 58 |
_client = None |
57 | 59 |
def get_client(): |
58 | 60 |
global _client |
... | ... | |
116 | 118 |
'import_limit', |
117 | 119 |
'export_limit')) |
118 | 120 |
|
119 |
@call('add_quota') |
|
120 |
def add_quota(quotalimits_list): |
|
121 |
def qh_add_quota(serial, quotalimits_list): |
|
122 |
if not QUOTAHOLDER_URL: |
|
123 |
return () |
|
124 |
|
|
125 |
context = {} |
|
126 |
c = get_client() |
|
127 |
|
|
121 | 128 |
data = [] |
122 | 129 |
append = data.append |
123 | 130 |
for ql in quotalimits_list: |
124 | 131 |
args = (ql.holder, ql.resource, ENTITY_KEY, |
125 | 132 |
0, ql.capacity, ql.import_limit, ql.export_limit) |
126 | 133 |
append(args) |
127 |
return data |
|
128 | 134 |
|
135 |
result = c.add_quota(context=context, |
|
136 |
clientkey=clientkey, |
|
137 |
serial=serial, |
|
138 |
add_quota=data) |
|
139 |
|
|
140 |
return result |
|
141 |
|
|
142 |
def qh_query_serials(serials): |
|
143 |
if not QUOTAHOLDER_URL: |
|
144 |
return () |
|
145 |
|
|
146 |
context = {} |
|
147 |
c = get_client() |
|
148 |
result = c.query_serials(context=context, |
|
149 |
clientkey=clientkey, |
|
150 |
serials=serials) |
|
151 |
return result |
|
152 |
|
|
153 |
def qh_ack_serials(serials): |
|
154 |
if not QUOTAHOLDER_URL: |
|
155 |
return () |
|
156 |
|
|
157 |
context = {} |
|
158 |
c = get_client() |
|
159 |
result = c.ack_serials(context=context, |
|
160 |
clientkey=clientkey, |
|
161 |
serials=serials) |
|
162 |
return |
|
129 | 163 |
|
130 | 164 |
@call('set_quota') |
131 | 165 |
def send_resource_quantities(resources, client=None): |
b/snf-astakos-app/astakos/im/managers.py | ||
---|---|---|
1 |
# Copyright 2012 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or without |
|
4 |
# modification, are permitted provided that the following conditions |
|
5 |
# are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above copyright |
|
8 |
# notice, this list of conditions and the following disclaimer. |
|
9 |
# |
|
10 |
# 2. Redistributions in binary form must reproduce the above copyright |
|
11 |
# notice, this list of conditions and the following disclaimer in the |
|
12 |
# documentation and/or other materials provided with the distribution. |
|
13 |
# |
|
14 |
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND |
|
15 |
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
|
16 |
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
|
17 |
# ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE |
|
18 |
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL |
|
19 |
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS |
|
20 |
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) |
|
21 |
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
22 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY |
|
23 |
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF |
|
24 |
# SUCH DAMAGE. |
|
25 |
# |
|
26 |
# The views and conclusions contained in the software and documentation are |
|
27 |
# those of the authors and should not be interpreted as representing official |
|
28 |
# policies, either expressed or implied, of GRNET S.A. |
|
29 |
|
|
30 |
from django.db import connections |
|
31 |
from django.db.models import Manager |
|
32 |
from django.db.models.query import QuerySet |
|
33 |
|
|
34 |
|
|
35 |
class ForUpdateManager(Manager): |
|
36 |
""" Model manager implementing SELECT .. FOR UPDATE statement |
|
37 |
|
|
38 |
This manager implements select_for_update() method in order to use |
|
39 |
row-level locking in the database and guarantee exclusive access, since |
|
40 |
this method is only implemented in Django>=1.4. |
|
41 |
|
|
42 |
Non-blocking reads are not implemented, and each query including a row |
|
43 |
that is locked by another transaction will block until the lock is |
|
44 |
released. Also care must be taken in order to avoid deadlocks or retry |
|
45 |
transactions that abort due to deadlocks. |
|
46 |
|
|
47 |
Example: |
|
48 |
networks = Network.objects.select_for_update().filter(public=True) |
|
49 |
|
|
50 |
""" |
|
51 |
|
|
52 |
def __init__(self, *args, **kwargs): |
|
53 |
super(ForUpdateManager, self).__init__(*args, **kwargs) |
|
54 |
self._select_for_update = False |
|
55 |
|
|
56 |
def filter(self, *args, **kwargs): |
|
57 |
query = self.get_query_set().filter(*args, **kwargs) |
|
58 |
if self._select_for_update: |
|
59 |
self._select_for_update = False |
|
60 |
return for_update(query) |
|
61 |
else: |
|
62 |
return query |
|
63 |
|
|
64 |
def get(self, *args, **kwargs): |
|
65 |
if not self._select_for_update: |
|
66 |
return self.get_query_set().get(*args, **kwargs) |
|
67 |
|
|
68 |
query = self.filter(*args, **kwargs) |
|
69 |
query = list(query) |
|
70 |
num = len(query) |
|
71 |
if num == 1: |
|
72 |
return query[0] |
|
73 |
if not num: |
|
74 |
raise self.model.DoesNotExist( |
|
75 |
"%s matching query does not exist. " |
|
76 |
"Lookup parameters were %s" % |
|
77 |
(self.model._meta.object_name, kwargs)) |
|
78 |
raise self.model.MultipleObjectsReturned( |
|
79 |
"get() returned more than one %s -- it returned %s! " |
|
80 |
"Lookup parameters were %s" % |
|
81 |
(self.model._meta.object_name, num, kwargs)) |
|
82 |
|
|
83 |
def select_for_update(self, *args, **kwargs): |
|
84 |
self._select_for_update = True |
|
85 |
return self |
|
86 |
|
|
87 |
|
|
88 |
def for_update(query): |
|
89 |
""" Rewrite query using SELECT .. FOR UPDATE. |
|
90 |
|
|
91 |
""" |
|
92 |
if 'sqlite' in connections[query.db].settings_dict['ENGINE'].lower(): |
|
93 |
# SQLite does not support FOR UPDATE |
|
94 |
return query |
|
95 |
sql, params = query.query.get_compiler(query.db).as_sql() |
|
96 |
return query.model._default_manager.raw(sql.rstrip() + ' FOR UPDATE', |
|
97 |
params) |
|
98 |
|
|
99 |
|
|
100 |
class ProtectedDeleteManager(ForUpdateManager): |
|
101 |
""" Manager for protecting Backend deletion. |
|
102 |
|
|
103 |
Call Backend delete() method in order to prevent deletion |
|
104 |
of Backends that host non-deleted VirtualMachines. |
|
105 |
|
|
106 |
""" |
|
107 |
|
|
108 |
def get_query_set(self): |
|
109 |
return BackendQuerySet(self.model, using=self._db) |
|
110 |
|
|
111 |
|
|
112 |
class BackendQuerySet(QuerySet): |
|
113 |
def delete(self): |
|
114 |
for backend in self._clone(): |
|
115 |
backend.delete() |
b/snf-astakos-app/astakos/im/models.py | ||
---|---|---|
71 | 71 |
SITENAME, SERVICES, MODERATION_ENABLED) |
72 | 72 |
from astakos.im import settings as astakos_settings |
73 | 73 |
from astakos.im.endpoints.qh import ( |
74 |
register_users, send_quota, register_resources, add_quota, QuotaLimits) |
|
74 |
register_users, send_quota, register_resources, qh_add_quota, QuotaLimits, |
|
75 |
qh_query_serials, qh_ack_serials) |
|
75 | 76 |
from astakos.im import auth_providers |
76 | 77 |
#from astakos.im.endpoints.aquarium.producer import report_user_event |
77 | 78 |
#from astakos.im.tasks import propagate_groupmembers_quota |
78 | 79 |
|
79 | 80 |
import astakos.im.messages as astakos_messages |
81 |
from .managers import ForUpdateManager |
|
80 | 82 |
|
81 | 83 |
logger = logging.getLogger(__name__) |
82 | 84 |
|
... | ... | |
459 | 461 |
p = m.project |
460 | 462 |
if not p.is_active: |
461 | 463 |
continue |
462 |
grants = p.current_application.projectresourcegrant_set.all()
|
|
464 |
grants = p.application.projectresourcegrant_set.all() |
|
463 | 465 |
for g in grants: |
464 | 466 |
d[str(g.resource)] += g.member_capacity or inf |
465 | 467 |
# TODO set default for remaining |
... | ... | |
1431 | 1433 |
pass |
1432 | 1434 |
project = Project(creation_date=now) |
1433 | 1435 |
|
1434 |
project.latest_application = self |
|
1435 |
project.set_membership_replaced() |
|
1436 |
project.application = self |
|
1436 | 1437 |
|
1437 |
with exclusive_or_raise: |
|
1438 |
project.status_set_flag(Project.SYNC_PENDING_DEFINITION) |
|
1438 |
# This will block while syncing, |
|
1439 |
# but unblock before setting the membership state. |
|
1440 |
# See ProjectMembership.set_sync() |
|
1441 |
project.set_membership_pending_sync() |
|
1439 | 1442 |
|
1440 | 1443 |
project.last_approval_date = now |
1441 | 1444 |
project.save() |
... | ... | |
1475 | 1478 |
|
1476 | 1479 |
class Project(models.Model): |
1477 | 1480 |
|
1478 |
synced_application = models.OneToOneField(
|
|
1481 |
application = models.OneToOneField(
|
|
1479 | 1482 |
ProjectApplication, |
1480 |
related_name='project', |
|
1481 |
null=True) |
|
1482 |
latest_application = models.OneToOneField( |
|
1483 |
ProjectApplication, |
|
1484 |
related_name='last_project') |
|
1483 |
related_name='project') |
|
1485 | 1484 |
last_approval_date = models.DateTimeField(null=True) |
1486 | 1485 |
|
1487 | 1486 |
members = models.ManyToManyField( |
... | ... | |
1497 | 1496 |
db_index=True, |
1498 | 1497 |
unique=True) |
1499 | 1498 |
|
1500 |
status = models.IntegerField(db_index=True) |
|
1501 |
|
|
1502 |
SYNCHRONIZED = 0 |
|
1503 |
SYNC_PENDING_MEMBERSHIP = (1 << 0) |
|
1504 |
SYNC_PENDING_DEFINITION = (1 << 1) |
|
1505 |
# SYNC_PENDING = (SYNC_PENDING_DEFINITION | |
|
1506 |
# SYNC_PENDING_MEMBERSHIP) |
|
1507 |
|
|
1508 |
|
|
1509 |
def status_set_flag(self, s): |
|
1510 |
self.status |= s |
|
1511 |
|
|
1512 |
def status_unset_flag(self, s): |
|
1513 |
self.status &= ~s |
|
1514 |
|
|
1515 |
def status_is_set_flag(self, s): |
|
1516 |
return self.status & s == s |
|
1517 |
|
|
1518 |
@property |
|
1519 |
def current_application(self): |
|
1520 |
return self.synced_application or self.latest_application |
|
1521 |
|
|
1522 | 1499 |
@property |
1523 | 1500 |
def violated_resource_grants(self): |
1524 |
if self.synced_application is None: |
|
1525 |
return True |
|
1526 |
# do something |
|
1527 | 1501 |
return False |
1528 | 1502 |
|
1529 | 1503 |
@property |
1530 | 1504 |
def violated_members_number_limit(self): |
1531 |
application = self.synced_application |
|
1532 |
if application is None: |
|
1533 |
return True |
|
1505 |
application = self.application |
|
1534 | 1506 |
return len(self.approved_members) > application.limit_on_members_number |
1535 | 1507 |
|
1536 | 1508 |
@property |
... | ... | |
1578 | 1550 |
|
1579 | 1551 |
@property |
1580 | 1552 |
def approved_memberships(self): |
1553 |
ACCEPTED = ProjectMembership.ACCEPTED |
|
1554 |
PENDING = ProjectMembership.PENDING |
|
1581 | 1555 |
return self.projectmembership_set.filter( |
1582 |
synced_state=ProjectMembership.ACCEPTED)
|
|
1556 |
Q(state=ACCEPTED) | Q(state=PENDING))
|
|
1583 | 1557 |
|
1584 | 1558 |
@property |
1585 | 1559 |
def approved_members(self): |
1586 | 1560 |
return [m.person for m in self.approved_memberships] |
1587 | 1561 |
|
1588 |
def check_sync(self, hint=None): |
|
1589 |
if self.status != self.SYNCHRONIZED: |
|
1590 |
self.sync() |
|
1591 |
|
|
1592 |
def set_membership_replaced(self): |
|
1593 |
members = [m for m in self.approved_memberships |
|
1594 |
if m.sync_is_synced()] |
|
1562 |
def set_membership_pending_sync(self): |
|
1563 |
ACCEPTED = ProjectMembership.ACCEPTED |
|
1564 |
PENDING = ProjectMembership.PENDING |
|
1565 |
sfu = self.projectmembership_set.select_for_update() |
|
1566 |
members = sfu.filter(Q(state=ACCEPTED) | Q(state=PENDING)) |
|
1595 | 1567 |
|
1596 | 1568 |
for member in members: |
1597 |
member.sync_set_new_state(member.REPLACED)
|
|
1569 |
member.state = member.PENDING
|
|
1598 | 1570 |
member.save() |
1599 | 1571 |
|
1600 |
def sync_membership(self): |
|
1601 |
pending_members = self.projectmembership.filter( |
|
1602 |
sync_status=ProjectMembership.STATUS_PENDING) |
|
1603 |
for member in members: |
|
1604 |
try: |
|
1605 |
member.sync() |
|
1606 |
except Exception: |
|
1607 |
raise |
|
1608 |
|
|
1609 |
still_pending_members = self.members.filter( |
|
1610 |
sync_status=ProjectMembership.STATUS_PENDING) |
|
1611 |
if not still_pending_members: |
|
1612 |
with exclusive_or_raise: |
|
1613 |
self.status_unset_flag(self.SYNC_PENDING_MEMBERSHIP) |
|
1614 |
self.save() |
|
1615 |
|
|
1616 |
def sync_definition(self): |
|
1617 |
try: |
|
1618 |
self.sync_membership() |
|
1619 |
except Exception: |
|
1620 |
raise |
|
1621 |
else: |
|
1622 |
with exclusive_or_raise: |
|
1623 |
self.status_unset_flag(self.SYNC_PENDING_DEFINITION) |
|
1624 |
self.synced_application = self.latest_application |
|
1625 |
self.save() |
|
1626 |
|
|
1627 |
def sync(self): |
|
1628 |
if self.status_is_set_flag(self.SYNC_PENDING_DEFINITION): |
|
1629 |
self.sync_definition() |
|
1630 |
if self.status_is_set_flag(self.SYNC_PENDING_MEMBERSHIP): |
|
1631 |
self.sync_membership() |
|
1632 |
|
|
1633 | 1572 |
def add_member(self, user): |
1634 | 1573 |
""" |
1635 | 1574 |
Raises: |
... | ... | |
1745 | 1684 |
acceptance_date = models.DateField(null=True, db_index=True) |
1746 | 1685 |
leave_request_date = models.DateField(null=True) |
1747 | 1686 |
|
1687 |
objects = ForUpdateManager() |
|
1688 |
|
|
1748 | 1689 |
REQUESTED = 0 |
1749 | 1690 |
PENDING = 1 |
1750 | 1691 |
ACCEPTED = 2 |
1751 | 1692 |
REMOVING = 3 |
1752 | 1693 |
REMOVED = 4 |
1753 |
REJECTED = 5 # never seen, because of .delete() |
|
1754 |
REPLACED = 6 # when the project definition is replaced |
|
1755 |
# spontaneously goes back to ACCEPTED when synced |
|
1756 | 1694 |
|
1757 | 1695 |
class Meta: |
1758 | 1696 |
unique_together = ("person", "project") |
... | ... | |
1765 | 1703 |
__repr__ = __str__ |
1766 | 1704 |
|
1767 | 1705 |
def __init__(self, *args, **kwargs): |
1768 |
self.sync_init_state(self.REQUEST)
|
|
1706 |
self.state = self.REQUESTED
|
|
1769 | 1707 |
super(ProjectMembership, self).__init__(*args, **kwargs) |
1770 | 1708 |
|
1771 | 1709 |
def _set_history_item(self, reason, date=None): |
... | ... | |
1791 | 1729 |
self._set_history_item(reason='ACCEPT', date=now) |
1792 | 1730 |
self.state = self.PENDING |
1793 | 1731 |
self.save() |
1732 |
trigger_sync() |
|
1794 | 1733 |
|
1795 | 1734 |
def remove(self): |
1796 | 1735 |
if state != self.ACCEPTED: |
1797 | 1736 |
m = _("%s: attempt to remove in state '%s'") % (self, state) |
1798 | 1737 |
raise AssertionError(m) |
1799 | 1738 |
|
1800 |
serial = self._set_history_item(reason='REMOVE')
|
|
1739 |
self._set_history_item(reason='REMOVE') |
|
1801 | 1740 |
self.state = self.REMOVING |
1802 | 1741 |
self.save() |
1742 |
trigger_sync() |
|
1803 | 1743 |
|
1804 | 1744 |
def reject(self): |
1805 | 1745 |
if state != self.REQUESTED: |
... | ... | |
1850 | 1790 |
import_limit = cur_grant.import_limit |
1851 | 1791 |
export_limit = cur_grant.export_limit |
1852 | 1792 |
|
1853 |
capacity += factor * new_grant.member_capacity
|
|
1854 |
import_limit += factor * new_grant.member_import_limit
|
|
1855 |
export_limit += factor * new_grant.member_export_limit
|
|
1793 |
capacity += new_grant.member_capacity |
|
1794 |
import_limit += new_grant.member_import_limit |
|
1795 |
export_limit += new_grant.member_export_limit |
|
1856 | 1796 |
|
1857 | 1797 |
append(QuotaLimits(holder = holder, |
1858 | 1798 |
key = key, |
... | ... | |
1865 | 1805 |
limits_list.extend(tmp_grants.itervalues()) |
1866 | 1806 |
return limits_list |
1867 | 1807 |
|
1808 |
def set_sync(self): |
|
1809 |
state = self.state |
|
1810 |
if state == self.PENDING: |
|
1811 |
pending_application = self.pending_application |
|
1812 |
if pending_application is None: |
|
1813 |
m = _("%s: attempt to sync an empty pending application") % ( |
|
1814 |
self, state) |
|
1815 |
raise AssertionError(m) |
|
1816 |
self.application = pending_application |
|
1817 |
self.pending_application = None |
|
1818 |
self.pending_serial = None |
|
1819 |
|
|
1820 |
# project.application may have changed in the meantime, |
|
1821 |
# in which case we stay PENDING; |
|
1822 |
# we are safe to check due to select_for_update |
|
1823 |
if self.application == self.project.application: |
|
1824 |
self.state = self.ACCEPTED |
|
1825 |
self.save() |
|
1826 |
elif state == self.REMOVING: |
|
1827 |
self.delete() |
|
1828 |
else: |
|
1829 |
m = _("%s: attempt to sync in state '%s'") % (self, state) |
|
1830 |
raise AssertionError(m) |
|
1831 |
|
|
1832 |
class Serial(models.Model): |
|
1833 |
serial = models.AutoField(primary_key=True) |
|
1834 |
|
|
1835 |
def new_serial(): |
|
1836 |
s = Serial.create() |
|
1837 |
return s.serial |
|
1868 | 1838 |
|
1869 | 1839 |
def sync_finish_serials(): |
1870 |
serials_to_ack = set(quotaholder_query_sync_serials([]))
|
|
1840 |
serials_to_ack = set(qh_query_serials([]))
|
|
1871 | 1841 |
sfu = ProjectMembership.objects.select_for_update() |
1872 | 1842 |
memberships = sfu.filter(pending_serial__isnull=False) |
1873 | 1843 |
|
... | ... | |
1879 | 1849 |
membership.set_sync() |
1880 | 1850 |
|
1881 | 1851 |
transaction.commit() |
1882 |
quotaholder_ack_sync_serials(list(serials_to_ack)) |
|
1883 |
|
|
1852 |
qh_ack_serials(list(serials_to_ack)) |
|
1884 | 1853 |
|
1885 | 1854 |
def sync_projects(): |
1886 | 1855 |
sync_finish_serials() |
... | ... | |
1889 | 1858 |
REMOVING = ProjectMembership.REMOVING |
1890 | 1859 |
objects = ProjectMembership.objects.select_for_update() |
1891 | 1860 |
|
1892 |
projects = set() |
|
1893 | 1861 |
quotas = [] |
1894 | 1862 |
|
1895 |
serial = get_serial()
|
|
1863 |
serial = new_serial()
|
|
1896 | 1864 |
|
1897 | 1865 |
pending = objects.filter(state=PENDING) |
1898 | 1866 |
for membership in pending: |
... | ... | |
1906 | 1874 |
membership, membership.pending_serial) |
1907 | 1875 |
raise AssertionError(m) |
1908 | 1876 |
|
1909 |
membership.pending_application = membership.project.latest_application
|
|
1877 |
membership.pending_application = membership.project.application |
|
1910 | 1878 |
membership.pending_serial = serial |
1911 | 1879 |
membership.get_diff_quotas(quotas) |
1912 | 1880 |
membership.save() |
... | ... | |
1928 | 1896 |
membership.save() |
1929 | 1897 |
|
1930 | 1898 |
transaction.commit() |
1931 |
quotaholder_add_quotas(serial, quotas) |
|
1899 |
# ProjectApplication.approve() unblocks here |
|
1900 |
# and can set PENDING an already PENDING membership |
|
1901 |
# which has been scheduled to sync with the old project.application |
|
1902 |
# Need to check in ProjectMembership.set_sync() |
|
1903 |
|
|
1904 |
qh_add_quota(serial, quotas) |
|
1932 | 1905 |
sync_finish_serials() |
1933 | 1906 |
|
1934 | 1907 |
|
Also available in: Unified diff