Revision 78e1f8da snf-pithos-backend/pithos/backends/modular.py
b/snf-pithos-backend/pithos/backends/modular.py | ||
---|---|---|
116 | 116 |
|
117 | 117 |
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3) |
118 | 118 |
|
119 |
QUOTA_POLICY = 'quota' |
|
120 |
VERSIONING_POLICY = 'versioning' |
|
121 |
PROJECT = 'project' |
|
122 |
|
|
119 | 123 |
inf = float('inf') |
120 | 124 |
|
121 | 125 |
ULTIMATE_ANSWER = 42 |
122 | 126 |
|
123 |
DEFAULT_SOURCE = 'system' |
|
124 | 127 |
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace' |
125 | 128 |
|
126 | 129 |
logger = logging.getLogger(__name__) |
... | ... | |
245 | 248 |
container_versioning_policy = container_versioning_policy \ |
246 | 249 |
or DEFAULT_CONTAINER_VERSIONING |
247 | 250 |
|
248 |
self.default_account_policy = {'quota': account_quota_policy}
|
|
251 |
self.default_account_policy = {} |
|
249 | 252 |
self.default_container_policy = { |
250 |
'quota': container_quota_policy, |
|
251 |
'versioning': container_versioning_policy |
|
253 |
QUOTA_POLICY: container_quota_policy, |
|
254 |
VERSIONING_POLICY: container_versioning_policy, |
|
255 |
PROJECT: None |
|
252 | 256 |
} |
253 | 257 |
#queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS |
254 | 258 |
#queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE |
... | ... | |
390 | 394 |
|
391 | 395 |
return self._allowed_accounts(user) |
392 | 396 |
|
393 |
def _get_account_quotas(self, account): |
|
394 |
"""Get account usage from astakos.""" |
|
395 |
|
|
396 |
quotas = self.astakosclient.service_get_quotas(account)[account] |
|
397 |
return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE, |
|
398 |
{}) |
|
399 |
|
|
400 |
def _get_account_quotas(self, account): |
|
401 |
"""Get account usage from astakos.""" |
|
402 |
|
|
403 |
quotas = self.astakosclient.service_get_quotas(account)[account] |
|
404 |
return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE, |
|
405 |
{}) |
|
406 |
|
|
407 | 397 |
@debug_method |
408 | 398 |
@backend_method |
409 |
def get_account_meta(self, user, account, domain, until=None, |
|
399 |
def get_account_meta(self, user, account, domain=None, until=None,
|
|
410 | 400 |
include_user_defined=True): |
411 | 401 |
"""Return a dictionary with the account metadata for the domain.""" |
412 | 402 |
|
... | ... | |
435 | 425 |
else: |
436 | 426 |
meta = {} |
437 | 427 |
if props is not None and include_user_defined: |
428 |
if domain is None: |
|
429 |
raise ValueError( |
|
430 |
'Domain argument is obligatory for getting ' |
|
431 |
'user defined metadata') |
|
438 | 432 |
meta.update( |
439 | 433 |
dict(self.node.attribute_get(props[self.SERIAL], domain))) |
440 | 434 |
if until is not None: |
441 | 435 |
meta.update({'until_timestamp': tstamp}) |
442 | 436 |
meta.update({'name': account, 'count': count, 'bytes': bytes}) |
443 | 437 |
if self.using_external_quotaholder: |
444 |
external_quota = self._get_account_quotas(account) |
|
445 |
meta['bytes'] = external_quota.get('usage', 0) |
|
438 |
external_quota = self.astakosclient.service_get_quotas( |
|
439 |
account)[account] |
|
440 |
meta['bytes'] = sum(d['pithos.diskspace']['usage'] for d in |
|
441 |
external_quota.values()) |
|
446 | 442 |
meta.update({'modified': modified}) |
447 | 443 |
return meta |
448 | 444 |
|
... | ... | |
494 | 490 |
path, node = self._lookup_account(account, True) |
495 | 491 |
policy = self._get_policy(node, is_account_policy=True) |
496 | 492 |
if self.using_external_quotaholder: |
497 |
external_quota = self._get_account_quotas(account) |
|
498 |
policy['quota'] = external_quota.get('limit', 0) |
|
493 |
external_quota = self.astakosclient.service_get_quotas( |
|
494 |
account)[account] |
|
495 |
policy.update(dict(('%s-%s' % (QUOTA_POLICY, k), |
|
496 |
v['pithos.diskspace']['limit']) for k, v in |
|
497 |
external_quota.items())) |
|
498 |
|
|
499 | 499 |
return policy |
500 | 500 |
|
501 | 501 |
@debug_method |
... | ... | |
505 | 505 |
|
506 | 506 |
self._can_write_account(user, account) |
507 | 507 |
path, node = self._lookup_account(account, True) |
508 |
self._check_policy(policy, is_account_policy=True)
|
|
509 |
self._put_policy(node, policy, replace, is_account_policy=True)
|
|
508 |
self._put_policy(node, policy, replace, is_account_policy=True,
|
|
509 |
check=True)
|
|
510 | 510 |
|
511 | 511 |
@debug_method |
512 | 512 |
@backend_method |
... | ... | |
518 | 518 |
node = self.node.node_lookup(account) |
519 | 519 |
if node is not None: |
520 | 520 |
raise AccountExists('Account already exists') |
521 |
if policy: |
|
522 |
self._check_policy(policy, is_account_policy=True) |
|
523 | 521 |
node = self._put_path(user, self.ROOTNODE, account, |
524 | 522 |
update_statistics_ancestors_depth=-1) |
525 |
self._put_policy(node, policy, True, is_account_policy=True) |
|
523 |
self._put_policy(node, policy, True, is_account_policy=True, |
|
524 |
check=True if policy else False) |
|
526 | 525 |
|
527 | 526 |
@debug_method |
528 | 527 |
@backend_method |
... | ... | |
586 | 585 |
|
587 | 586 |
@debug_method |
588 | 587 |
@backend_method |
589 |
def get_container_meta(self, user, account, container, domain, until=None,
|
|
590 |
include_user_defined=True): |
|
588 |
def get_container_meta(self, user, account, container, domain=None, |
|
589 |
until=None, include_user_defined=True):
|
|
591 | 590 |
"""Return a dictionary with the container metadata for the domain.""" |
592 | 591 |
|
593 | 592 |
self._can_read_container(user, account, container) |
... | ... | |
611 | 610 |
else: |
612 | 611 |
meta = {} |
613 | 612 |
if include_user_defined: |
613 |
if domain is None: |
|
614 |
raise ValueError( |
|
615 |
'Domain argument is obligatory for getting ' |
|
616 |
'user defined metadata') |
|
614 | 617 |
meta.update( |
615 | 618 |
dict(self.node.attribute_get(props[self.SERIAL], domain))) |
616 | 619 |
if until is not None: |
... | ... | |
632 | 635 |
update_statistics_ancestors_depth=0) |
633 | 636 |
if src_version_id is not None: |
634 | 637 |
versioning = self._get_policy( |
635 |
node, is_account_policy=False)['versioning']
|
|
638 |
node, is_account_policy=False)[VERSIONING_POLICY]
|
|
636 | 639 |
if versioning != 'auto': |
637 | 640 |
self.node.version_remove(src_version_id, |
638 | 641 |
update_statistics_ancestors_depth=0) |
... | ... | |
656 | 659 |
|
657 | 660 |
self._can_write_container(user, account, container) |
658 | 661 |
path, node = self._lookup_container(account, container) |
659 |
self._check_policy(policy, is_account_policy=False) |
|
660 |
self._put_policy(node, policy, replace, is_account_policy=False) |
|
662 |
|
|
663 |
if PROJECT in policy: |
|
664 |
project = self._get_project(node) |
|
665 |
try: |
|
666 |
serial = self.astakosclient.issue_resource_reassignment( |
|
667 |
holder=account, |
|
668 |
from_source=project, |
|
669 |
to_source=policy[PROJECT], |
|
670 |
provisions={'pithos.diskspace': self.get_container_meta( |
|
671 |
user, account, container, |
|
672 |
include_user_defined=False)['bytes']}) |
|
673 |
except BaseException, e: |
|
674 |
raise QuotaError(e) |
|
675 |
else: |
|
676 |
self.serials.append(serial) |
|
677 |
|
|
678 |
self._put_policy(node, policy, replace, is_account_policy=False, |
|
679 |
default_project=account, check=True) |
|
661 | 680 |
|
662 | 681 |
@debug_method |
663 | 682 |
@backend_method |
... | ... | |
672 | 691 |
pass |
673 | 692 |
else: |
674 | 693 |
raise ContainerExists('Container already exists') |
675 |
if policy: |
|
676 |
self._check_policy(policy, is_account_policy=False) |
|
677 | 694 |
path = '/'.join((account, container)) |
678 | 695 |
node = self._put_path( |
679 | 696 |
user, self._lookup_account(account, True)[1], path, |
680 | 697 |
update_statistics_ancestors_depth=-1) |
681 |
self._put_policy(node, policy, True, is_account_policy=False) |
|
698 |
self._put_policy(node, policy, True, is_account_policy=False, |
|
699 |
default_project=account, |
|
700 |
check=True if policy else False) |
|
682 | 701 |
|
683 | 702 |
@debug_method |
684 | 703 |
@backend_method |
... | ... | |
688 | 707 |
|
689 | 708 |
self._can_write_container(user, account, container) |
690 | 709 |
path, node = self._lookup_container(account, container) |
710 |
project = self._get_project(node) |
|
691 | 711 |
|
692 | 712 |
if until is not None: |
693 | 713 |
hashes, size, serials = self.node.node_purge_children( |
... | ... | |
699 | 719 |
update_statistics_ancestors_depth=0) |
700 | 720 |
if not self.free_versioning: |
701 | 721 |
self._report_size_change( |
702 |
user, account, -size, { |
|
722 |
user, account, -size, project, {
|
|
703 | 723 |
'action': 'container purge', |
704 | 724 |
'path': path, |
705 | 725 |
'versions': ','.join(str(i) for i in serials) |
... | ... | |
720 | 740 |
self.node.node_remove(node, update_statistics_ancestors_depth=0) |
721 | 741 |
if not self.free_versioning: |
722 | 742 |
self._report_size_change( |
723 |
user, account, -size, { |
|
743 |
user, account, -size, project, {
|
|
724 | 744 |
'action': 'container purge', |
725 | 745 |
'path': path, |
726 | 746 |
'versions': ','.join(str(i) for i in serials) |
... | ... | |
746 | 766 |
account, container, src_version_id, |
747 | 767 |
update_statistics_ancestors_depth=1) |
748 | 768 |
self._report_size_change( |
749 |
user, account, -del_size, { |
|
769 |
user, account, -del_size, project, {
|
|
750 | 770 |
'action': 'object delete', |
751 | 771 |
'path': path, |
752 | 772 |
'versions': ','.join([str(dest_version_id)])}) |
... | ... | |
915 | 935 |
|
916 | 936 |
@debug_method |
917 | 937 |
@backend_method |
918 |
def get_object_meta(self, user, account, container, name, domain, |
|
938 |
def get_object_meta(self, user, account, container, name, domain=None,
|
|
919 | 939 |
version=None, include_user_defined=True): |
920 | 940 |
"""Return a dictionary with the object metadata for the domain.""" |
921 | 941 |
|
... | ... | |
937 | 957 |
|
938 | 958 |
meta = {} |
939 | 959 |
if include_user_defined: |
960 |
if domain is None: |
|
961 |
raise ValueError( |
|
962 |
'Domain argument is obligatory for getting ' |
|
963 |
'user defined metadata') |
|
940 | 964 |
meta.update( |
941 | 965 |
dict(self.node.attribute_get(props[self.SERIAL], domain))) |
942 | 966 |
meta.update({'name': name, |
... | ... | |
1095 | 1119 |
account_path, account_node = self._lookup_account(account, True) |
1096 | 1120 |
container_path, container_node = self._lookup_container( |
1097 | 1121 |
account, container) |
1122 |
project = self._get_project(container_node) |
|
1098 | 1123 |
|
1099 | 1124 |
path, node = self._put_object_node( |
1100 | 1125 |
container_path, container_node, name) |
... | ... | |
1116 | 1141 |
# Check account quota. |
1117 | 1142 |
if not self.using_external_quotaholder: |
1118 | 1143 |
account_quota = long(self._get_policy( |
1119 |
account_node, is_account_policy=True)['quota'])
|
|
1144 |
account_node, is_account_policy=True)[QUOTA_POLICY])
|
|
1120 | 1145 |
account_usage = self._get_statistics(account_node, |
1121 | 1146 |
compute=True)[1] |
1122 | 1147 |
if (account_quota > 0 and account_usage > account_quota): |
... | ... | |
1126 | 1151 |
|
1127 | 1152 |
# Check container quota. |
1128 | 1153 |
container_quota = long(self._get_policy( |
1129 |
container_node, is_account_policy=False)['quota'])
|
|
1154 |
container_node, is_account_policy=False)[QUOTA_POLICY])
|
|
1130 | 1155 |
container_usage = self._get_statistics(container_node)[1] |
1131 | 1156 |
if (container_quota > 0 and container_usage > container_quota): |
1132 | 1157 |
# This must be executed in a transaction, so the version is |
... | ... | |
1139 | 1164 |
|
1140 | 1165 |
if report_size_change: |
1141 | 1166 |
self._report_size_change( |
1142 |
user, account, size_delta, |
|
1167 |
user, account, size_delta, project,
|
|
1143 | 1168 |
{'action': 'object update', 'path': path, |
1144 | 1169 |
'versions': ','.join([str(dest_version_id)])}) |
1145 | 1170 |
if permissions is not None: |
... | ... | |
1312 | 1337 |
if user != account: |
1313 | 1338 |
raise NotAllowedError |
1314 | 1339 |
|
1315 |
# lookup object and lock container path also |
|
1316 |
path, node = self._lookup_object(account, container, name, |
|
1317 |
lock_container=True) |
|
1340 |
# lock container path |
|
1341 |
container_path, container_node = self._lookup_container(account, |
|
1342 |
container) |
|
1343 |
project = self._get_project(container_node) |
|
1344 |
path, node = self._lookup_object(account, container, name) |
|
1318 | 1345 |
|
1319 | 1346 |
if until is not None: |
1320 | 1347 |
if node is None: |
... | ... | |
1342 | 1369 |
except NameError: |
1343 | 1370 |
self.permissions.access_clear(path) |
1344 | 1371 |
self._report_size_change( |
1345 |
user, account, -size, { |
|
1372 |
user, account, -size, project, {
|
|
1346 | 1373 |
'action': 'object purge', |
1347 | 1374 |
'path': path, |
1348 | 1375 |
'versions': ','.join(str(i) for i in serials) |
... | ... | |
1360 | 1387 |
update_statistics_ancestors_depth=1) |
1361 | 1388 |
if report_size_change: |
1362 | 1389 |
self._report_size_change( |
1363 |
user, account, -del_size, |
|
1390 |
user, account, -del_size, project,
|
|
1364 | 1391 |
{'action': 'object delete', |
1365 | 1392 |
'path': path, |
1366 | 1393 |
'versions': ','.join([str(dest_version_id)])}) |
... | ... | |
1389 | 1416 |
update_statistics_ancestors_depth=1) |
1390 | 1417 |
if report_size_change: |
1391 | 1418 |
self._report_size_change( |
1392 |
user, account, -del_size, |
|
1419 |
user, account, -del_size, project,
|
|
1393 | 1420 |
{'action': 'object delete', |
1394 | 1421 |
'path': path, |
1395 | 1422 |
'versions': ','.join([str(dest_version_id)])}) |
... | ... | |
1677 | 1704 |
|
1678 | 1705 |
@debug_method |
1679 | 1706 |
@backend_method |
1680 |
def _report_size_change(self, user, account, size, details=None): |
|
1707 |
def _report_size_change(self, user, account, size, source, details=None):
|
|
1681 | 1708 |
details = details or {} |
1682 | 1709 |
|
1683 | 1710 |
if size == 0: |
... | ... | |
1697 | 1724 |
name = details['path'] if 'path' in details else '' |
1698 | 1725 |
serial = self.astakosclient.issue_one_commission( |
1699 | 1726 |
holder=account, |
1700 |
source=DEFAULT_SOURCE,
|
|
1727 |
source=source,
|
|
1701 | 1728 |
provisions={'pithos.diskspace': size}, |
1702 | 1729 |
name=name) |
1703 | 1730 |
except BaseException, e: |
... | ... | |
1725 | 1752 |
|
1726 | 1753 |
# Policy functions. |
1727 | 1754 |
|
1728 |
def _check_policy(self, policy, is_account_policy=True): |
|
1729 |
default_policy = self.default_account_policy \ |
|
1730 |
if is_account_policy else self.default_container_policy |
|
1731 |
for k in policy.keys(): |
|
1732 |
if policy[k] == '': |
|
1733 |
policy[k] = default_policy.get(k) |
|
1755 |
def _check_project(self, value): |
|
1756 |
# raise ValueError('Bad quota source policy') |
|
1757 |
pass |
|
1758 |
|
|
1759 |
def _check_policy(self, policy): |
|
1734 | 1760 |
for k, v in policy.iteritems(): |
1735 |
if k == 'quota':
|
|
1761 |
if k == QUOTA_POLICY:
|
|
1736 | 1762 |
q = int(v) # May raise ValueError. |
1737 | 1763 |
if q < 0: |
1738 | 1764 |
raise ValueError |
1739 |
elif k == 'versioning':
|
|
1765 |
elif k == VERSIONING_POLICY:
|
|
1740 | 1766 |
if v not in ['auto', 'none']: |
1741 | 1767 |
raise ValueError |
1768 |
elif k == PROJECT: |
|
1769 |
self._check_project(v) |
|
1742 | 1770 |
else: |
1743 | 1771 |
raise ValueError |
1744 | 1772 |
|
1745 |
def _put_policy(self, node, policy, replace, is_account_policy=True): |
|
1746 |
default_policy = self.default_account_policy \ |
|
1747 |
if is_account_policy else self.default_container_policy |
|
1773 |
def _get_default_policy(self, node=None, is_account_policy=True, |
|
1774 |
default_project=None): |
|
1775 |
if is_account_policy: |
|
1776 |
default_policy = self.default_account_policy |
|
1777 |
else: |
|
1778 |
default_policy = self.default_container_policy |
|
1779 |
if default_project is None and node is not None: |
|
1780 |
# set container's account as the default quota source |
|
1781 |
default_project = self.node.node_get_parent_path(node) |
|
1782 |
default_policy[PROJECT] = default_project |
|
1783 |
return default_policy |
|
1784 |
|
|
1785 |
def _put_policy(self, node, policy, replace, |
|
1786 |
is_account_policy=True, default_project=None, |
|
1787 |
check=True): |
|
1788 |
default_policy = self._get_default_policy(node, |
|
1789 |
is_account_policy, |
|
1790 |
default_project) |
|
1748 | 1791 |
if replace: |
1749 | 1792 |
for k, v in default_policy.iteritems(): |
1750 | 1793 |
if k not in policy: |
1751 | 1794 |
policy[k] = v |
1795 |
if check: |
|
1796 |
self._check_policy(policy) |
|
1797 |
|
|
1752 | 1798 |
self.node.policy_set(node, policy) |
1753 | 1799 |
|
1754 |
def _get_policy(self, node, is_account_policy=True): |
|
1755 |
default_policy = self.default_account_policy \ |
|
1756 |
if is_account_policy else self.default_container_policy |
|
1800 |
def _get_policy(self, node, is_account_policy=True, |
|
1801 |
default_project=None): |
|
1802 |
default_policy = self._get_default_policy(node, |
|
1803 |
is_account_policy, |
|
1804 |
default_project) |
|
1757 | 1805 |
policy = default_policy.copy() |
1758 | 1806 |
policy.update(self.node.policy_get(node)) |
1759 | 1807 |
return policy |
1760 | 1808 |
|
1809 |
def _get_project(self, node): |
|
1810 |
policy = self._get_policy(node, is_account_policy=False) |
|
1811 |
return policy[PROJECT] |
|
1812 |
|
|
1761 | 1813 |
def _apply_versioning(self, account, container, version_id, |
1762 | 1814 |
update_statistics_ancestors_depth=None): |
1763 | 1815 |
"""Delete the provided version if such is the policy. |
... | ... | |
1768 | 1820 |
return 0 |
1769 | 1821 |
path, node = self._lookup_container(account, container) |
1770 | 1822 |
versioning = self._get_policy( |
1771 |
node, is_account_policy=False)['versioning']
|
|
1823 |
node, is_account_policy=False)[VERSIONING_POLICY]
|
|
1772 | 1824 |
if versioning != 'auto': |
1773 | 1825 |
hash, size = self.node.version_remove( |
1774 | 1826 |
version_id, update_statistics_ancestors_depth) |
Also available in: Unified diff