116 |
116 |
|
117 |
117 |
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
|
118 |
118 |
|
|
119 |
QUOTA_POLICY = 'quota'
|
|
120 |
VERSIONING_POLICY = 'versioning'
|
|
121 |
PROJECT = 'project'
|
|
122 |
|
119 |
123 |
inf = float('inf')
|
120 |
124 |
|
121 |
125 |
ULTIMATE_ANSWER = 42
|
122 |
126 |
|
123 |
|
DEFAULT_SOURCE = 'system'
|
124 |
127 |
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
|
125 |
128 |
|
126 |
129 |
logger = logging.getLogger(__name__)
|
... | ... | |
245 |
248 |
container_versioning_policy = container_versioning_policy \
|
246 |
249 |
or DEFAULT_CONTAINER_VERSIONING
|
247 |
250 |
|
248 |
|
self.default_account_policy = {'quota': account_quota_policy}
|
|
251 |
self.default_account_policy = {}
|
249 |
252 |
self.default_container_policy = {
|
250 |
|
'quota': container_quota_policy,
|
251 |
|
'versioning': container_versioning_policy
|
|
253 |
QUOTA_POLICY: container_quota_policy,
|
|
254 |
VERSIONING_POLICY: container_versioning_policy,
|
|
255 |
PROJECT: None
|
252 |
256 |
}
|
253 |
257 |
#queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
|
254 |
258 |
#queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
|
... | ... | |
390 |
394 |
|
391 |
395 |
return self._allowed_accounts(user)
|
392 |
396 |
|
393 |
|
def _get_account_quotas(self, account):
|
394 |
|
"""Get account usage from astakos."""
|
395 |
|
|
396 |
|
quotas = self.astakosclient.service_get_quotas(account)[account]
|
397 |
|
return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
|
398 |
|
{})
|
399 |
|
|
400 |
|
def _get_account_quotas(self, account):
|
401 |
|
"""Get account usage from astakos."""
|
402 |
|
|
403 |
|
quotas = self.astakosclient.service_get_quotas(account)[account]
|
404 |
|
return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
|
405 |
|
{})
|
406 |
|
|
407 |
397 |
@debug_method
|
408 |
398 |
@backend_method
|
409 |
|
def get_account_meta(self, user, account, domain, until=None,
|
|
399 |
def get_account_meta(self, user, account, domain=None, until=None,
|
410 |
400 |
include_user_defined=True):
|
411 |
401 |
"""Return a dictionary with the account metadata for the domain."""
|
412 |
402 |
|
... | ... | |
435 |
425 |
else:
|
436 |
426 |
meta = {}
|
437 |
427 |
if props is not None and include_user_defined:
|
|
428 |
if domain is None:
|
|
429 |
raise ValueError(
|
|
430 |
'Domain argument is obligatory for getting '
|
|
431 |
'user defined metadata')
|
438 |
432 |
meta.update(
|
439 |
433 |
dict(self.node.attribute_get(props[self.SERIAL], domain)))
|
440 |
434 |
if until is not None:
|
441 |
435 |
meta.update({'until_timestamp': tstamp})
|
442 |
436 |
meta.update({'name': account, 'count': count, 'bytes': bytes})
|
443 |
437 |
if self.using_external_quotaholder:
|
444 |
|
external_quota = self._get_account_quotas(account)
|
445 |
|
meta['bytes'] = external_quota.get('usage', 0)
|
|
438 |
external_quota = self.astakosclient.service_get_quotas(
|
|
439 |
account)[account]
|
|
440 |
meta['bytes'] = sum(d['pithos.diskspace']['usage'] for d in
|
|
441 |
external_quota.values())
|
446 |
442 |
meta.update({'modified': modified})
|
447 |
443 |
return meta
|
448 |
444 |
|
... | ... | |
494 |
490 |
path, node = self._lookup_account(account, True)
|
495 |
491 |
policy = self._get_policy(node, is_account_policy=True)
|
496 |
492 |
if self.using_external_quotaholder:
|
497 |
|
external_quota = self._get_account_quotas(account)
|
498 |
|
policy['quota'] = external_quota.get('limit', 0)
|
|
493 |
external_quota = self.astakosclient.service_get_quotas(
|
|
494 |
account)[account]
|
|
495 |
policy.update(dict(('%s-%s' % (QUOTA_POLICY, k),
|
|
496 |
v['pithos.diskspace']['limit']) for k, v in
|
|
497 |
external_quota.items()))
|
|
498 |
|
499 |
499 |
return policy
|
500 |
500 |
|
501 |
501 |
@debug_method
|
... | ... | |
505 |
505 |
|
506 |
506 |
self._can_write_account(user, account)
|
507 |
507 |
path, node = self._lookup_account(account, True)
|
508 |
|
self._check_policy(policy, is_account_policy=True)
|
509 |
|
self._put_policy(node, policy, replace, is_account_policy=True)
|
|
508 |
self._put_policy(node, policy, replace, is_account_policy=True,
|
|
509 |
check=True)
|
510 |
510 |
|
511 |
511 |
@debug_method
|
512 |
512 |
@backend_method
|
... | ... | |
518 |
518 |
node = self.node.node_lookup(account)
|
519 |
519 |
if node is not None:
|
520 |
520 |
raise AccountExists('Account already exists')
|
521 |
|
if policy:
|
522 |
|
self._check_policy(policy, is_account_policy=True)
|
523 |
521 |
node = self._put_path(user, self.ROOTNODE, account,
|
524 |
522 |
update_statistics_ancestors_depth=-1)
|
525 |
|
self._put_policy(node, policy, True, is_account_policy=True)
|
|
523 |
self._put_policy(node, policy, True, is_account_policy=True,
|
|
524 |
check=True if policy else False)
|
526 |
525 |
|
527 |
526 |
@debug_method
|
528 |
527 |
@backend_method
|
... | ... | |
586 |
585 |
|
587 |
586 |
@debug_method
|
588 |
587 |
@backend_method
|
589 |
|
def get_container_meta(self, user, account, container, domain, until=None,
|
590 |
|
include_user_defined=True):
|
|
588 |
def get_container_meta(self, user, account, container, domain=None,
|
|
589 |
until=None, include_user_defined=True):
|
591 |
590 |
"""Return a dictionary with the container metadata for the domain."""
|
592 |
591 |
|
593 |
592 |
self._can_read_container(user, account, container)
|
... | ... | |
611 |
610 |
else:
|
612 |
611 |
meta = {}
|
613 |
612 |
if include_user_defined:
|
|
613 |
if domain is None:
|
|
614 |
raise ValueError(
|
|
615 |
'Domain argument is obligatory for getting '
|
|
616 |
'user defined metadata')
|
614 |
617 |
meta.update(
|
615 |
618 |
dict(self.node.attribute_get(props[self.SERIAL], domain)))
|
616 |
619 |
if until is not None:
|
... | ... | |
632 |
635 |
update_statistics_ancestors_depth=0)
|
633 |
636 |
if src_version_id is not None:
|
634 |
637 |
versioning = self._get_policy(
|
635 |
|
node, is_account_policy=False)['versioning']
|
|
638 |
node, is_account_policy=False)[VERSIONING_POLICY]
|
636 |
639 |
if versioning != 'auto':
|
637 |
640 |
self.node.version_remove(src_version_id,
|
638 |
641 |
update_statistics_ancestors_depth=0)
|
... | ... | |
656 |
659 |
|
657 |
660 |
self._can_write_container(user, account, container)
|
658 |
661 |
path, node = self._lookup_container(account, container)
|
659 |
|
self._check_policy(policy, is_account_policy=False)
|
660 |
|
self._put_policy(node, policy, replace, is_account_policy=False)
|
|
662 |
|
|
663 |
if PROJECT in policy:
|
|
664 |
project = self._get_project(node)
|
|
665 |
try:
|
|
666 |
serial = self.astakosclient.issue_resource_reassignment(
|
|
667 |
holder=account,
|
|
668 |
from_source=project,
|
|
669 |
to_source=policy[PROJECT],
|
|
670 |
provisions={'pithos.diskspace': self.get_container_meta(
|
|
671 |
user, account, container,
|
|
672 |
include_user_defined=False)['bytes']})
|
|
673 |
except BaseException, e:
|
|
674 |
raise QuotaError(e)
|
|
675 |
else:
|
|
676 |
self.serials.append(serial)
|
|
677 |
|
|
678 |
self._put_policy(node, policy, replace, is_account_policy=False,
|
|
679 |
default_project=account, check=True)
|
661 |
680 |
|
662 |
681 |
@debug_method
|
663 |
682 |
@backend_method
|
... | ... | |
672 |
691 |
pass
|
673 |
692 |
else:
|
674 |
693 |
raise ContainerExists('Container already exists')
|
675 |
|
if policy:
|
676 |
|
self._check_policy(policy, is_account_policy=False)
|
677 |
694 |
path = '/'.join((account, container))
|
678 |
695 |
node = self._put_path(
|
679 |
696 |
user, self._lookup_account(account, True)[1], path,
|
680 |
697 |
update_statistics_ancestors_depth=-1)
|
681 |
|
self._put_policy(node, policy, True, is_account_policy=False)
|
|
698 |
self._put_policy(node, policy, True, is_account_policy=False,
|
|
699 |
default_project=account,
|
|
700 |
check=True if policy else False)
|
682 |
701 |
|
683 |
702 |
@debug_method
|
684 |
703 |
@backend_method
|
... | ... | |
688 |
707 |
|
689 |
708 |
self._can_write_container(user, account, container)
|
690 |
709 |
path, node = self._lookup_container(account, container)
|
|
710 |
project = self._get_project(node)
|
691 |
711 |
|
692 |
712 |
if until is not None:
|
693 |
713 |
hashes, size, serials = self.node.node_purge_children(
|
... | ... | |
699 |
719 |
update_statistics_ancestors_depth=0)
|
700 |
720 |
if not self.free_versioning:
|
701 |
721 |
self._report_size_change(
|
702 |
|
user, account, -size, {
|
|
722 |
user, account, -size, project, {
|
703 |
723 |
'action': 'container purge',
|
704 |
724 |
'path': path,
|
705 |
725 |
'versions': ','.join(str(i) for i in serials)
|
... | ... | |
720 |
740 |
self.node.node_remove(node, update_statistics_ancestors_depth=0)
|
721 |
741 |
if not self.free_versioning:
|
722 |
742 |
self._report_size_change(
|
723 |
|
user, account, -size, {
|
|
743 |
user, account, -size, project, {
|
724 |
744 |
'action': 'container purge',
|
725 |
745 |
'path': path,
|
726 |
746 |
'versions': ','.join(str(i) for i in serials)
|
... | ... | |
746 |
766 |
account, container, src_version_id,
|
747 |
767 |
update_statistics_ancestors_depth=1)
|
748 |
768 |
self._report_size_change(
|
749 |
|
user, account, -del_size, {
|
|
769 |
user, account, -del_size, project, {
|
750 |
770 |
'action': 'object delete',
|
751 |
771 |
'path': path,
|
752 |
772 |
'versions': ','.join([str(dest_version_id)])})
|
... | ... | |
915 |
935 |
|
916 |
936 |
@debug_method
|
917 |
937 |
@backend_method
|
918 |
|
def get_object_meta(self, user, account, container, name, domain,
|
|
938 |
def get_object_meta(self, user, account, container, name, domain=None,
|
919 |
939 |
version=None, include_user_defined=True):
|
920 |
940 |
"""Return a dictionary with the object metadata for the domain."""
|
921 |
941 |
|
... | ... | |
937 |
957 |
|
938 |
958 |
meta = {}
|
939 |
959 |
if include_user_defined:
|
|
960 |
if domain is None:
|
|
961 |
raise ValueError(
|
|
962 |
'Domain argument is obligatory for getting '
|
|
963 |
'user defined metadata')
|
940 |
964 |
meta.update(
|
941 |
965 |
dict(self.node.attribute_get(props[self.SERIAL], domain)))
|
942 |
966 |
meta.update({'name': name,
|
... | ... | |
1095 |
1119 |
account_path, account_node = self._lookup_account(account, True)
|
1096 |
1120 |
container_path, container_node = self._lookup_container(
|
1097 |
1121 |
account, container)
|
|
1122 |
project = self._get_project(container_node)
|
1098 |
1123 |
|
1099 |
1124 |
path, node = self._put_object_node(
|
1100 |
1125 |
container_path, container_node, name)
|
... | ... | |
1116 |
1141 |
# Check account quota.
|
1117 |
1142 |
if not self.using_external_quotaholder:
|
1118 |
1143 |
account_quota = long(self._get_policy(
|
1119 |
|
account_node, is_account_policy=True)['quota'])
|
|
1144 |
account_node, is_account_policy=True)[QUOTA_POLICY])
|
1120 |
1145 |
account_usage = self._get_statistics(account_node,
|
1121 |
1146 |
compute=True)[1]
|
1122 |
1147 |
if (account_quota > 0 and account_usage > account_quota):
|
... | ... | |
1126 |
1151 |
|
1127 |
1152 |
# Check container quota.
|
1128 |
1153 |
container_quota = long(self._get_policy(
|
1129 |
|
container_node, is_account_policy=False)['quota'])
|
|
1154 |
container_node, is_account_policy=False)[QUOTA_POLICY])
|
1130 |
1155 |
container_usage = self._get_statistics(container_node)[1]
|
1131 |
1156 |
if (container_quota > 0 and container_usage > container_quota):
|
1132 |
1157 |
# This must be executed in a transaction, so the version is
|
... | ... | |
1139 |
1164 |
|
1140 |
1165 |
if report_size_change:
|
1141 |
1166 |
self._report_size_change(
|
1142 |
|
user, account, size_delta,
|
|
1167 |
user, account, size_delta, project,
|
1143 |
1168 |
{'action': 'object update', 'path': path,
|
1144 |
1169 |
'versions': ','.join([str(dest_version_id)])})
|
1145 |
1170 |
if permissions is not None:
|
... | ... | |
1312 |
1337 |
if user != account:
|
1313 |
1338 |
raise NotAllowedError
|
1314 |
1339 |
|
1315 |
|
# lookup object and lock container path also
|
1316 |
|
path, node = self._lookup_object(account, container, name,
|
1317 |
|
lock_container=True)
|
|
1340 |
# lock container path
|
|
1341 |
container_path, container_node = self._lookup_container(account,
|
|
1342 |
container)
|
|
1343 |
project = self._get_project(container_node)
|
|
1344 |
path, node = self._lookup_object(account, container, name)
|
1318 |
1345 |
|
1319 |
1346 |
if until is not None:
|
1320 |
1347 |
if node is None:
|
... | ... | |
1342 |
1369 |
except NameError:
|
1343 |
1370 |
self.permissions.access_clear(path)
|
1344 |
1371 |
self._report_size_change(
|
1345 |
|
user, account, -size, {
|
|
1372 |
user, account, -size, project, {
|
1346 |
1373 |
'action': 'object purge',
|
1347 |
1374 |
'path': path,
|
1348 |
1375 |
'versions': ','.join(str(i) for i in serials)
|
... | ... | |
1360 |
1387 |
update_statistics_ancestors_depth=1)
|
1361 |
1388 |
if report_size_change:
|
1362 |
1389 |
self._report_size_change(
|
1363 |
|
user, account, -del_size,
|
|
1390 |
user, account, -del_size, project,
|
1364 |
1391 |
{'action': 'object delete',
|
1365 |
1392 |
'path': path,
|
1366 |
1393 |
'versions': ','.join([str(dest_version_id)])})
|
... | ... | |
1389 |
1416 |
update_statistics_ancestors_depth=1)
|
1390 |
1417 |
if report_size_change:
|
1391 |
1418 |
self._report_size_change(
|
1392 |
|
user, account, -del_size,
|
|
1419 |
user, account, -del_size, project,
|
1393 |
1420 |
{'action': 'object delete',
|
1394 |
1421 |
'path': path,
|
1395 |
1422 |
'versions': ','.join([str(dest_version_id)])})
|
... | ... | |
1677 |
1704 |
|
1678 |
1705 |
@debug_method
|
1679 |
1706 |
@backend_method
|
1680 |
|
def _report_size_change(self, user, account, size, details=None):
|
|
1707 |
def _report_size_change(self, user, account, size, source, details=None):
|
1681 |
1708 |
details = details or {}
|
1682 |
1709 |
|
1683 |
1710 |
if size == 0:
|
... | ... | |
1697 |
1724 |
name = details['path'] if 'path' in details else ''
|
1698 |
1725 |
serial = self.astakosclient.issue_one_commission(
|
1699 |
1726 |
holder=account,
|
1700 |
|
source=DEFAULT_SOURCE,
|
|
1727 |
source=source,
|
1701 |
1728 |
provisions={'pithos.diskspace': size},
|
1702 |
1729 |
name=name)
|
1703 |
1730 |
except BaseException, e:
|
... | ... | |
1725 |
1752 |
|
1726 |
1753 |
# Policy functions.
|
1727 |
1754 |
|
1728 |
|
def _check_policy(self, policy, is_account_policy=True):
|
1729 |
|
default_policy = self.default_account_policy \
|
1730 |
|
if is_account_policy else self.default_container_policy
|
1731 |
|
for k in policy.keys():
|
1732 |
|
if policy[k] == '':
|
1733 |
|
policy[k] = default_policy.get(k)
|
|
1755 |
def _check_project(self, value):
|
|
1756 |
# raise ValueError('Bad quota source policy')
|
|
1757 |
pass
|
|
1758 |
|
|
1759 |
def _check_policy(self, policy):
|
1734 |
1760 |
for k, v in policy.iteritems():
|
1735 |
|
if k == 'quota':
|
|
1761 |
if k == QUOTA_POLICY:
|
1736 |
1762 |
q = int(v) # May raise ValueError.
|
1737 |
1763 |
if q < 0:
|
1738 |
1764 |
raise ValueError
|
1739 |
|
elif k == 'versioning':
|
|
1765 |
elif k == VERSIONING_POLICY:
|
1740 |
1766 |
if v not in ['auto', 'none']:
|
1741 |
1767 |
raise ValueError
|
|
1768 |
elif k == PROJECT:
|
|
1769 |
self._check_project(v)
|
1742 |
1770 |
else:
|
1743 |
1771 |
raise ValueError
|
1744 |
1772 |
|
1745 |
|
def _put_policy(self, node, policy, replace, is_account_policy=True):
|
1746 |
|
default_policy = self.default_account_policy \
|
1747 |
|
if is_account_policy else self.default_container_policy
|
|
1773 |
def _get_default_policy(self, node=None, is_account_policy=True,
|
|
1774 |
default_project=None):
|
|
1775 |
if is_account_policy:
|
|
1776 |
default_policy = self.default_account_policy
|
|
1777 |
else:
|
|
1778 |
default_policy = self.default_container_policy
|
|
1779 |
if default_project is None and node is not None:
|
|
1780 |
# set container's account as the default quota source
|
|
1781 |
default_project = self.node.node_get_parent_path(node)
|
|
1782 |
default_policy[PROJECT] = default_project
|
|
1783 |
return default_policy
|
|
1784 |
|
|
1785 |
def _put_policy(self, node, policy, replace,
|
|
1786 |
is_account_policy=True, default_project=None,
|
|
1787 |
check=True):
|
|
1788 |
default_policy = self._get_default_policy(node,
|
|
1789 |
is_account_policy,
|
|
1790 |
default_project)
|
1748 |
1791 |
if replace:
|
1749 |
1792 |
for k, v in default_policy.iteritems():
|
1750 |
1793 |
if k not in policy:
|
1751 |
1794 |
policy[k] = v
|
|
1795 |
if check:
|
|
1796 |
self._check_policy(policy)
|
|
1797 |
|
1752 |
1798 |
self.node.policy_set(node, policy)
|
1753 |
1799 |
|
1754 |
|
def _get_policy(self, node, is_account_policy=True):
|
1755 |
|
default_policy = self.default_account_policy \
|
1756 |
|
if is_account_policy else self.default_container_policy
|
|
1800 |
def _get_policy(self, node, is_account_policy=True,
|
|
1801 |
default_project=None):
|
|
1802 |
default_policy = self._get_default_policy(node,
|
|
1803 |
is_account_policy,
|
|
1804 |
default_project)
|
1757 |
1805 |
policy = default_policy.copy()
|
1758 |
1806 |
policy.update(self.node.policy_get(node))
|
1759 |
1807 |
return policy
|
1760 |
1808 |
|
|
1809 |
def _get_project(self, node):
|
|
1810 |
policy = self._get_policy(node, is_account_policy=False)
|
|
1811 |
return policy[PROJECT]
|
|
1812 |
|
1761 |
1813 |
def _apply_versioning(self, account, container, version_id,
|
1762 |
1814 |
update_statistics_ancestors_depth=None):
|
1763 |
1815 |
"""Delete the provided version if such is the policy.
|
... | ... | |
1768 |
1820 |
return 0
|
1769 |
1821 |
path, node = self._lookup_container(account, container)
|
1770 |
1822 |
versioning = self._get_policy(
|
1771 |
|
node, is_account_policy=False)['versioning']
|
|
1823 |
node, is_account_policy=False)[VERSIONING_POLICY]
|
1772 |
1824 |
if versioning != 'auto':
|
1773 |
1825 |
hash, size = self.node.version_remove(
|
1774 |
1826 |
version_id, update_statistics_ancestors_depth)
|