Revision ee45eb81

b/snf-astakos-app/astakos/im/endpoints/qh.py
53 53

  
54 54
inf = float('inf')
55 55

  
56
clientkey = 'astakos'
57

  
56 58
_client = None
57 59
def get_client():
58 60
    global _client
......
116 118
                                         'import_limit',
117 119
                                         'export_limit'))
118 120

  
119
@call('add_quota')
120
def add_quota(quotalimits_list):
121
def qh_add_quota(serial, quotalimits_list):
122
    if not QUOTAHOLDER_URL:
123
        return ()
124

  
125
    context = {}
126
    c = get_client()
127

  
121 128
    data = []
122 129
    append = data.append
123 130
    for ql in quotalimits_list:
124 131
        args = (ql.holder, ql.resource, ENTITY_KEY,
125 132
                0, ql.capacity, ql.import_limit, ql.export_limit)
126 133
        append(args)
127
    return data
128 134

  
135
    result = c.add_quota(context=context,
136
                         clientkey=clientkey,
137
                         serial=serial,
138
                         add_quota=data)
139

  
140
    return result
141

  
142
def qh_query_serials(serials):
143
    if not QUOTAHOLDER_URL:
144
        return ()
145

  
146
    context = {}
147
    c = get_client()
148
    result = c.query_serials(context=context,
149
                             clientkey=clientkey,
150
                             serials=serials)
151
    return result
152

  
153
def qh_ack_serials(serials):
154
    if not QUOTAHOLDER_URL:
155
        return ()
156

  
157
    context = {}
158
    c = get_client()
159
    result = c.ack_serials(context=context,
160
                           clientkey=clientkey,
161
                           serials=serials)
162
    return
129 163

  
130 164
@call('set_quota')
131 165
def send_resource_quantities(resources, client=None):
b/snf-astakos-app/astakos/im/managers.py
1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

  
30
from django.db import connections
31
from django.db.models import Manager
32
from django.db.models.query import QuerySet
33

  
34

  
35
class ForUpdateManager(Manager):
36
    """ Model manager implementing SELECT .. FOR UPDATE statement
37

  
38
        This manager implements select_for_update() method in order to use
39
        row-level locking in the database and guarantee exclusive access, since
40
        this method is only implemented in Django>=1.4.
41

  
42
        Non-blocking reads are not implemented, and each query including a row
43
        that is locked by another transaction will block until the lock is
44
        released. Also care must be taken in order to avoid deadlocks or retry
45
        transactions that abort due to deadlocks.
46

  
47
        Example:
48
            networks = Network.objects.select_for_update().filter(public=True)
49

  
50
    """
51

  
52
    def __init__(self, *args, **kwargs):
53
        super(ForUpdateManager, self).__init__(*args, **kwargs)
54
        self._select_for_update = False
55

  
56
    def filter(self, *args, **kwargs):
57
        query = self.get_query_set().filter(*args, **kwargs)
58
        if self._select_for_update:
59
            self._select_for_update = False
60
            return for_update(query)
61
        else:
62
            return query
63

  
64
    def get(self, *args, **kwargs):
65
        if not self._select_for_update:
66
            return self.get_query_set().get(*args, **kwargs)
67

  
68
        query = self.filter(*args, **kwargs)
69
        query = list(query)
70
        num = len(query)
71
        if num == 1:
72
            return query[0]
73
        if not num:
74
            raise self.model.DoesNotExist(
75
                    "%s matching query does not exist. "
76
                    "Lookup parameters were %s" %
77
                    (self.model._meta.object_name, kwargs))
78
        raise self.model.MultipleObjectsReturned(
79
            "get() returned more than one %s -- it returned %s! "
80
            "Lookup parameters were %s" %
81
            (self.model._meta.object_name, num, kwargs))
82

  
83
    def select_for_update(self, *args, **kwargs):
84
        self._select_for_update = True
85
        return self
86

  
87

  
88
def for_update(query):
89
    """ Rewrite query using SELECT .. FOR UPDATE.
90

  
91
    """
92
    if 'sqlite' in connections[query.db].settings_dict['ENGINE'].lower():
93
        # SQLite  does not support FOR UPDATE
94
        return query
95
    sql, params = query.query.get_compiler(query.db).as_sql()
96
    return query.model._default_manager.raw(sql.rstrip() + ' FOR UPDATE',
97
                                            params)
98

  
99

  
100
class ProtectedDeleteManager(ForUpdateManager):
101
    """ Manager for protecting Backend deletion.
102

  
103
        Call Backend delete() method in order to prevent deletion
104
        of Backends that host non-deleted VirtualMachines.
105

  
106
    """
107

  
108
    def get_query_set(self):
109
        return BackendQuerySet(self.model, using=self._db)
110

  
111

  
112
class BackendQuerySet(QuerySet):
113
    def delete(self):
114
        for backend in self._clone():
115
            backend.delete()
b/snf-astakos-app/astakos/im/models.py
71 71
    SITENAME, SERVICES, MODERATION_ENABLED)
72 72
from astakos.im import settings as astakos_settings
73 73
from astakos.im.endpoints.qh import (
74
    register_users, send_quota, register_resources, add_quota, QuotaLimits)
74
    register_users, send_quota, register_resources, qh_add_quota, QuotaLimits,
75
    qh_query_serials, qh_ack_serials)
75 76
from astakos.im import auth_providers
76 77
#from astakos.im.endpoints.aquarium.producer import report_user_event
77 78
#from astakos.im.tasks import propagate_groupmembers_quota
78 79

  
79 80
import astakos.im.messages as astakos_messages
81
from .managers import ForUpdateManager
80 82

  
81 83
logger = logging.getLogger(__name__)
82 84

  
......
459 461
            p = m.project
460 462
            if not p.is_active:
461 463
                continue
462
            grants = p.current_application.projectresourcegrant_set.all()
464
            grants = p.application.projectresourcegrant_set.all()
463 465
            for g in grants:
464 466
                d[str(g.resource)] += g.member_capacity or inf
465 467
        # TODO set default for remaining
......
1431 1433
                pass
1432 1434
            project = Project(creation_date=now)
1433 1435

  
1434
        project.latest_application = self
1435
        project.set_membership_replaced()
1436
        project.application = self
1436 1437

  
1437
        with exclusive_or_raise:
1438
            project.status_set_flag(Project.SYNC_PENDING_DEFINITION)
1438
        # This will block while syncing,
1439
        # but unblock before setting the membership state.
1440
        # See ProjectMembership.set_sync()
1441
        project.set_membership_pending_sync()
1439 1442

  
1440 1443
        project.last_approval_date = now
1441 1444
        project.save()
......
1475 1478

  
1476 1479
class Project(models.Model):
1477 1480

  
1478
    synced_application          =   models.OneToOneField(
1481
    application                 =   models.OneToOneField(
1479 1482
                                            ProjectApplication,
1480
                                            related_name='project',
1481
                                            null=True)
1482
    latest_application          =   models.OneToOneField(
1483
                                            ProjectApplication,
1484
                                            related_name='last_project')
1483
                                            related_name='project')
1485 1484
    last_approval_date          =   models.DateTimeField(null=True)
1486 1485

  
1487 1486
    members                     =   models.ManyToManyField(
......
1497 1496
                                            db_index=True,
1498 1497
                                            unique=True)
1499 1498

  
1500
    status                      =   models.IntegerField(db_index=True)
1501

  
1502
    SYNCHRONIZED                =   0
1503
    SYNC_PENDING_MEMBERSHIP     =   (1 << 0)
1504
    SYNC_PENDING_DEFINITION     =   (1 << 1)
1505
    # SYNC_PENDING                =   (SYNC_PENDING_DEFINITION |
1506
    #                                  SYNC_PENDING_MEMBERSHIP)
1507

  
1508

  
1509
    def status_set_flag(self, s):
1510
        self.status |= s
1511

  
1512
    def status_unset_flag(self, s):
1513
        self.status &= ~s
1514

  
1515
    def status_is_set_flag(self, s):
1516
        return self.status & s == s
1517

  
1518
    @property
1519
    def current_application(self):
1520
        return self.synced_application or self.latest_application
1521

  
1522 1499
    @property
1523 1500
    def violated_resource_grants(self):
1524
        if self.synced_application is None:
1525
            return True
1526
        # do something
1527 1501
        return False
1528 1502

  
1529 1503
    @property
1530 1504
    def violated_members_number_limit(self):
1531
        application = self.synced_application
1532
        if application is None:
1533
            return True
1505
        application = self.application
1534 1506
        return len(self.approved_members) > application.limit_on_members_number
1535 1507

  
1536 1508
    @property
......
1578 1550

  
1579 1551
    @property
1580 1552
    def approved_memberships(self):
1553
        ACCEPTED = ProjectMembership.ACCEPTED
1554
        PENDING  = ProjectMembership.PENDING
1581 1555
        return self.projectmembership_set.filter(
1582
            synced_state=ProjectMembership.ACCEPTED)
1556
            Q(state=ACCEPTED) | Q(state=PENDING))
1583 1557

  
1584 1558
    @property
1585 1559
    def approved_members(self):
1586 1560
        return [m.person for m in self.approved_memberships]
1587 1561

  
1588
    def check_sync(self, hint=None):
1589
        if self.status != self.SYNCHRONIZED:
1590
            self.sync()
1591

  
1592
    def set_membership_replaced(self):
1593
        members = [m for m in self.approved_memberships
1594
                   if m.sync_is_synced()]
1562
    def set_membership_pending_sync(self):
1563
        ACCEPTED = ProjectMembership.ACCEPTED
1564
        PENDING  = ProjectMembership.PENDING
1565
        sfu = self.projectmembership_set.select_for_update()
1566
        members = sfu.filter(Q(state=ACCEPTED) | Q(state=PENDING))
1595 1567

  
1596 1568
        for member in members:
1597
            member.sync_set_new_state(member.REPLACED)
1569
            member.state = member.PENDING
1598 1570
            member.save()
1599 1571

  
1600
    def sync_membership(self):
1601
        pending_members = self.projectmembership.filter(
1602
            sync_status=ProjectMembership.STATUS_PENDING)
1603
        for member in members:
1604
            try:
1605
                member.sync()
1606
            except Exception:
1607
                raise
1608

  
1609
        still_pending_members = self.members.filter(
1610
            sync_status=ProjectMembership.STATUS_PENDING)
1611
        if not still_pending_members:
1612
            with exclusive_or_raise:
1613
                self.status_unset_flag(self.SYNC_PENDING_MEMBERSHIP)
1614
                self.save()
1615

  
1616
    def sync_definition(self):
1617
        try:
1618
            self.sync_membership()
1619
        except Exception:
1620
            raise
1621
        else:
1622
            with exclusive_or_raise:
1623
                self.status_unset_flag(self.SYNC_PENDING_DEFINITION)
1624
                self.synced_application = self.latest_application
1625
                self.save()
1626

  
1627
    def sync(self):
1628
        if self.status_is_set_flag(self.SYNC_PENDING_DEFINITION):
1629
            self.sync_definition()
1630
        if self.status_is_set_flag(self.SYNC_PENDING_MEMBERSHIP):
1631
            self.sync_membership()
1632

  
1633 1572
    def add_member(self, user):
1634 1573
        """
1635 1574
        Raises:
......
1745 1684
    acceptance_date     =   models.DateField(null=True, db_index=True)
1746 1685
    leave_request_date  =   models.DateField(null=True)
1747 1686

  
1687
    objects     =   ForUpdateManager()
1688

  
1748 1689
    REQUESTED   =   0
1749 1690
    PENDING     =   1
1750 1691
    ACCEPTED    =   2
1751 1692
    REMOVING    =   3
1752 1693
    REMOVED     =   4
1753
    REJECTED    =   5   # never seen, because of .delete()
1754
    REPLACED    =   6   # when the project definition is replaced
1755
                        # spontaneously goes back to ACCEPTED when synced
1756 1694

  
1757 1695
    class Meta:
1758 1696
        unique_together = ("person", "project")
......
1765 1703
    __repr__ = __str__
1766 1704

  
1767 1705
    def __init__(self, *args, **kwargs):
1768
        self.sync_init_state(self.REQUEST)
1706
        self.state = self.REQUESTED
1769 1707
        super(ProjectMembership, self).__init__(*args, **kwargs)
1770 1708

  
1771 1709
    def _set_history_item(self, reason, date=None):
......
1791 1729
        self._set_history_item(reason='ACCEPT', date=now)
1792 1730
        self.state = self.PENDING
1793 1731
        self.save()
1732
        trigger_sync()
1794 1733

  
1795 1734
    def remove(self):
1796 1735
        if state != self.ACCEPTED:
1797 1736
            m = _("%s: attempt to remove in state '%s'") % (self, state)
1798 1737
            raise AssertionError(m)
1799 1738

  
1800
        serial = self._set_history_item(reason='REMOVE')
1739
        self._set_history_item(reason='REMOVE')
1801 1740
        self.state = self.REMOVING
1802 1741
        self.save()
1742
        trigger_sync()
1803 1743

  
1804 1744
    def reject(self):
1805 1745
        if state != self.REQUESTED:
......
1850 1790
                    import_limit = cur_grant.import_limit
1851 1791
                    export_limit = cur_grant.export_limit
1852 1792

  
1853
                capacity += factor * new_grant.member_capacity
1854
                import_limit += factor * new_grant.member_import_limit
1855
                export_limit += factor * new_grant.member_export_limit
1793
                capacity += new_grant.member_capacity
1794
                import_limit += new_grant.member_import_limit
1795
                export_limit += new_grant.member_export_limit
1856 1796

  
1857 1797
                append(QuotaLimits(holder       = holder,
1858 1798
                                   key          = key,
......
1865 1805
        limits_list.extend(tmp_grants.itervalues())
1866 1806
        return limits_list
1867 1807

  
1808
    def set_sync(self):
1809
        state = self.state
1810
        if state == self.PENDING:
1811
            pending_application = self.pending_application
1812
            if pending_application is None:
1813
                m = _("%s: attempt to sync an empty pending application") % (
1814
                    self, state)
1815
                raise AssertionError(m)
1816
            self.application = pending_application
1817
            self.pending_application = None
1818
            self.pending_serial = None
1819

  
1820
            # project.application may have changed in the meantime,
1821
            # in which case we stay PENDING;
1822
            # we are safe to check due to select_for_update
1823
            if self.application == self.project.application:
1824
                self.state = self.ACCEPTED
1825
            self.save()
1826
        elif state == self.REMOVING:
1827
            self.delete()
1828
        else:
1829
            m = _("%s: attempt to sync in state '%s'") % (self, state)
1830
            raise AssertionError(m)
1831

  
1832
class Serial(models.Model):
1833
    serial  =   models.AutoField(primary_key=True)
1834

  
1835
def new_serial():
1836
    s = Serial.create()
1837
    return s.serial
1868 1838

  
1869 1839
def sync_finish_serials():
1870
    serials_to_ack = set(quotaholder_query_sync_serials([]))
1840
    serials_to_ack = set(qh_query_serials([]))
1871 1841
    sfu = ProjectMembership.objects.select_for_update()
1872 1842
    memberships = sfu.filter(pending_serial__isnull=False)
1873 1843

  
......
1879 1849
            membership.set_sync()
1880 1850

  
1881 1851
    transaction.commit()
1882
    quotaholder_ack_sync_serials(list(serials_to_ack))
1883

  
1852
    qh_ack_serials(list(serials_to_ack))
1884 1853

  
1885 1854
def sync_projects():
1886 1855
    sync_finish_serials()
......
1889 1858
    REMOVING = ProjectMembership.REMOVING
1890 1859
    objects = ProjectMembership.objects.select_for_update()
1891 1860

  
1892
    projects = set()
1893 1861
    quotas = []
1894 1862

  
1895
    serial = get_serial()
1863
    serial = new_serial()
1896 1864

  
1897 1865
    pending = objects.filter(state=PENDING)
1898 1866
    for membership in pending:
......
1906 1874
                membership, membership.pending_serial)
1907 1875
            raise AssertionError(m)
1908 1876

  
1909
        membership.pending_application = membership.project.latest_application
1877
        membership.pending_application = membership.project.application
1910 1878
        membership.pending_serial = serial
1911 1879
        membership.get_diff_quotas(quotas)
1912 1880
        membership.save()
......
1928 1896
        membership.save()
1929 1897

  
1930 1898
    transaction.commit()
1931
    quotaholder_add_quotas(serial, quotas)
1899
    # ProjectApplication.approve() unblocks here
1900
    # and can set PENDING an already PENDING membership
1901
    # which has been scheduled to sync with the old project.application
1902
    # Need to check in ProjectMembership.set_sync()
1903

  
1904
    qh_add_quota(serial, quotas)
1932 1905
    sync_finish_serials()
1933 1906

  
1934 1907

  

Also available in: Unified diff