Revision 78e1f8da snf-pithos-backend/pithos/backends/modular.py

b/snf-pithos-backend/pithos/backends/modular.py
116 116

  
117 117
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
118 118

  
119
QUOTA_POLICY = 'quota'
120
VERSIONING_POLICY = 'versioning'
121
PROJECT = 'project'
122

  
119 123
inf = float('inf')
120 124

  
121 125
ULTIMATE_ANSWER = 42
122 126

  
123
DEFAULT_SOURCE = 'system'
124 127
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
125 128

  
126 129
logger = logging.getLogger(__name__)
......
245 248
        container_versioning_policy = container_versioning_policy \
246 249
            or DEFAULT_CONTAINER_VERSIONING
247 250

  
248
        self.default_account_policy = {'quota': account_quota_policy}
251
        self.default_account_policy = {}
249 252
        self.default_container_policy = {
250
            'quota': container_quota_policy,
251
            'versioning': container_versioning_policy
253
            QUOTA_POLICY: container_quota_policy,
254
            VERSIONING_POLICY: container_versioning_policy,
255
            PROJECT: None
252 256
        }
253 257
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
254 258
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
......
390 394

  
391 395
        return self._allowed_accounts(user)
392 396

  
393
    def _get_account_quotas(self, account):
394
        """Get account usage from astakos."""
395

  
396
        quotas = self.astakosclient.service_get_quotas(account)[account]
397
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
398
                                                  {})
399

  
400
    def _get_account_quotas(self, account):
401
        """Get account usage from astakos."""
402

  
403
        quotas = self.astakosclient.service_get_quotas(account)[account]
404
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
405
                                                  {})
406

  
407 397
    @debug_method
408 398
    @backend_method
409
    def get_account_meta(self, user, account, domain, until=None,
399
    def get_account_meta(self, user, account, domain=None, until=None,
410 400
                         include_user_defined=True):
411 401
        """Return a dictionary with the account metadata for the domain."""
412 402

  
......
435 425
        else:
436 426
            meta = {}
437 427
            if props is not None and include_user_defined:
428
                if domain is None:
429
                    raise ValueError(
430
                        'Domain argument is obligatory for getting '
431
                        'user defined metadata')
438 432
                meta.update(
439 433
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
440 434
            if until is not None:
441 435
                meta.update({'until_timestamp': tstamp})
442 436
            meta.update({'name': account, 'count': count, 'bytes': bytes})
443 437
            if self.using_external_quotaholder:
444
                external_quota = self._get_account_quotas(account)
445
                meta['bytes'] = external_quota.get('usage', 0)
438
                external_quota = self.astakosclient.service_get_quotas(
439
                    account)[account]
440
                meta['bytes'] = sum(d['pithos.diskspace']['usage'] for d in
441
                                    external_quota.values())
446 442
        meta.update({'modified': modified})
447 443
        return meta
448 444

  
......
494 490
        path, node = self._lookup_account(account, True)
495 491
        policy = self._get_policy(node, is_account_policy=True)
496 492
        if self.using_external_quotaholder:
497
            external_quota = self._get_account_quotas(account)
498
            policy['quota'] = external_quota.get('limit', 0)
493
            external_quota = self.astakosclient.service_get_quotas(
494
                account)[account]
495
            policy.update(dict(('%s-%s' % (QUOTA_POLICY, k),
496
                                v['pithos.diskspace']['limit']) for k, v in
497
                               external_quota.items()))
498

  
499 499
        return policy
500 500

  
501 501
    @debug_method
......
505 505

  
506 506
        self._can_write_account(user, account)
507 507
        path, node = self._lookup_account(account, True)
508
        self._check_policy(policy, is_account_policy=True)
509
        self._put_policy(node, policy, replace, is_account_policy=True)
508
        self._put_policy(node, policy, replace, is_account_policy=True,
509
                         check=True)
510 510

  
511 511
    @debug_method
512 512
    @backend_method
......
518 518
        node = self.node.node_lookup(account)
519 519
        if node is not None:
520 520
            raise AccountExists('Account already exists')
521
        if policy:
522
            self._check_policy(policy, is_account_policy=True)
523 521
        node = self._put_path(user, self.ROOTNODE, account,
524 522
                              update_statistics_ancestors_depth=-1)
525
        self._put_policy(node, policy, True, is_account_policy=True)
523
        self._put_policy(node, policy, True, is_account_policy=True,
524
                         check=True if policy else False)
526 525

  
527 526
    @debug_method
528 527
    @backend_method
......
586 585

  
587 586
    @debug_method
588 587
    @backend_method
589
    def get_container_meta(self, user, account, container, domain, until=None,
590
                           include_user_defined=True):
588
    def get_container_meta(self, user, account, container, domain=None,
589
                           until=None, include_user_defined=True):
591 590
        """Return a dictionary with the container metadata for the domain."""
592 591

  
593 592
        self._can_read_container(user, account, container)
......
611 610
        else:
612 611
            meta = {}
613 612
            if include_user_defined:
613
                if domain is None:
614
                    raise ValueError(
615
                        'Domain argument is obligatory for getting '
616
                        'user defined metadata')
614 617
                meta.update(
615 618
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
616 619
            if until is not None:
......
632 635
            update_statistics_ancestors_depth=0)
633 636
        if src_version_id is not None:
634 637
            versioning = self._get_policy(
635
                node, is_account_policy=False)['versioning']
638
                node, is_account_policy=False)[VERSIONING_POLICY]
636 639
            if versioning != 'auto':
637 640
                self.node.version_remove(src_version_id,
638 641
                                         update_statistics_ancestors_depth=0)
......
656 659

  
657 660
        self._can_write_container(user, account, container)
658 661
        path, node = self._lookup_container(account, container)
659
        self._check_policy(policy, is_account_policy=False)
660
        self._put_policy(node, policy, replace, is_account_policy=False)
662

  
663
        if PROJECT in policy:
664
            project = self._get_project(node)
665
            try:
666
                serial = self.astakosclient.issue_resource_reassignment(
667
                    holder=account,
668
                    from_source=project,
669
                    to_source=policy[PROJECT],
670
                    provisions={'pithos.diskspace': self.get_container_meta(
671
                        user, account, container,
672
                        include_user_defined=False)['bytes']})
673
            except BaseException, e:
674
                raise QuotaError(e)
675
            else:
676
                self.serials.append(serial)
677

  
678
        self._put_policy(node, policy, replace, is_account_policy=False,
679
                         default_project=account, check=True)
661 680

  
662 681
    @debug_method
663 682
    @backend_method
......
672 691
            pass
673 692
        else:
674 693
            raise ContainerExists('Container already exists')
675
        if policy:
676
            self._check_policy(policy, is_account_policy=False)
677 694
        path = '/'.join((account, container))
678 695
        node = self._put_path(
679 696
            user, self._lookup_account(account, True)[1], path,
680 697
            update_statistics_ancestors_depth=-1)
681
        self._put_policy(node, policy, True, is_account_policy=False)
698
        self._put_policy(node, policy, True, is_account_policy=False,
699
                         default_project=account,
700
                         check=True if policy else False)
682 701

  
683 702
    @debug_method
684 703
    @backend_method
......
688 707

  
689 708
        self._can_write_container(user, account, container)
690 709
        path, node = self._lookup_container(account, container)
710
        project = self._get_project(node)
691 711

  
692 712
        if until is not None:
693 713
            hashes, size, serials = self.node.node_purge_children(
......
699 719
                                          update_statistics_ancestors_depth=0)
700 720
            if not self.free_versioning:
701 721
                self._report_size_change(
702
                    user, account, -size, {
722
                    user, account, -size, project, {
703 723
                        'action': 'container purge',
704 724
                        'path': path,
705 725
                        'versions': ','.join(str(i) for i in serials)
......
720 740
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
721 741
            if not self.free_versioning:
722 742
                self._report_size_change(
723
                    user, account, -size, {
743
                    user, account, -size, project, {
724 744
                        'action': 'container purge',
725 745
                        'path': path,
726 746
                        'versions': ','.join(str(i) for i in serials)
......
746 766
                    account, container, src_version_id,
747 767
                    update_statistics_ancestors_depth=1)
748 768
                self._report_size_change(
749
                    user, account, -del_size, {
769
                    user, account, -del_size, project, {
750 770
                        'action': 'object delete',
751 771
                        'path': path,
752 772
                        'versions': ','.join([str(dest_version_id)])})
......
915 935

  
916 936
    @debug_method
917 937
    @backend_method
918
    def get_object_meta(self, user, account, container, name, domain,
938
    def get_object_meta(self, user, account, container, name, domain=None,
919 939
                        version=None, include_user_defined=True):
920 940
        """Return a dictionary with the object metadata for the domain."""
921 941

  
......
937 957

  
938 958
        meta = {}
939 959
        if include_user_defined:
960
            if domain is None:
961
                raise ValueError(
962
                    'Domain argument is obligatory for getting '
963
                    'user defined metadata')
940 964
            meta.update(
941 965
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
942 966
        meta.update({'name': name,
......
1095 1119
        account_path, account_node = self._lookup_account(account, True)
1096 1120
        container_path, container_node = self._lookup_container(
1097 1121
            account, container)
1122
        project = self._get_project(container_node)
1098 1123

  
1099 1124
        path, node = self._put_object_node(
1100 1125
            container_path, container_node, name)
......
1116 1141
            # Check account quota.
1117 1142
            if not self.using_external_quotaholder:
1118 1143
                account_quota = long(self._get_policy(
1119
                    account_node, is_account_policy=True)['quota'])
1144
                    account_node, is_account_policy=True)[QUOTA_POLICY])
1120 1145
                account_usage = self._get_statistics(account_node,
1121 1146
                                                     compute=True)[1]
1122 1147
                if (account_quota > 0 and account_usage > account_quota):
......
1126 1151

  
1127 1152
            # Check container quota.
1128 1153
            container_quota = long(self._get_policy(
1129
                container_node, is_account_policy=False)['quota'])
1154
                container_node, is_account_policy=False)[QUOTA_POLICY])
1130 1155
            container_usage = self._get_statistics(container_node)[1]
1131 1156
            if (container_quota > 0 and container_usage > container_quota):
1132 1157
                # This must be executed in a transaction, so the version is
......
1139 1164

  
1140 1165
        if report_size_change:
1141 1166
            self._report_size_change(
1142
                user, account, size_delta,
1167
                user, account, size_delta, project,
1143 1168
                {'action': 'object update', 'path': path,
1144 1169
                 'versions': ','.join([str(dest_version_id)])})
1145 1170
        if permissions is not None:
......
1312 1337
        if user != account:
1313 1338
            raise NotAllowedError
1314 1339

  
1315
        # lookup object and lock container path also
1316
        path, node = self._lookup_object(account, container, name,
1317
                                         lock_container=True)
1340
        # lock container path
1341
        container_path, container_node = self._lookup_container(account,
1342
                                                                container)
1343
        project = self._get_project(container_node)
1344
        path, node = self._lookup_object(account, container, name)
1318 1345

  
1319 1346
        if until is not None:
1320 1347
            if node is None:
......
1342 1369
            except NameError:
1343 1370
                self.permissions.access_clear(path)
1344 1371
            self._report_size_change(
1345
                user, account, -size, {
1372
                user, account, -size, project, {
1346 1373
                    'action': 'object purge',
1347 1374
                    'path': path,
1348 1375
                    'versions': ','.join(str(i) for i in serials)
......
1360 1387
                                          update_statistics_ancestors_depth=1)
1361 1388
        if report_size_change:
1362 1389
            self._report_size_change(
1363
                user, account, -del_size,
1390
                user, account, -del_size, project,
1364 1391
                {'action': 'object delete',
1365 1392
                 'path': path,
1366 1393
                 'versions': ','.join([str(dest_version_id)])})
......
1389 1416
                    update_statistics_ancestors_depth=1)
1390 1417
                if report_size_change:
1391 1418
                    self._report_size_change(
1392
                        user, account, -del_size,
1419
                        user, account, -del_size, project,
1393 1420
                        {'action': 'object delete',
1394 1421
                         'path': path,
1395 1422
                         'versions': ','.join([str(dest_version_id)])})
......
1677 1704

  
1678 1705
    @debug_method
1679 1706
    @backend_method
1680
    def _report_size_change(self, user, account, size, details=None):
1707
    def _report_size_change(self, user, account, size, source, details=None):
1681 1708
        details = details or {}
1682 1709

  
1683 1710
        if size == 0:
......
1697 1724
            name = details['path'] if 'path' in details else ''
1698 1725
            serial = self.astakosclient.issue_one_commission(
1699 1726
                holder=account,
1700
                source=DEFAULT_SOURCE,
1727
                source=source,
1701 1728
                provisions={'pithos.diskspace': size},
1702 1729
                name=name)
1703 1730
        except BaseException, e:
......
1725 1752

  
1726 1753
    # Policy functions.
1727 1754

  
1728
    def _check_policy(self, policy, is_account_policy=True):
1729
        default_policy = self.default_account_policy \
1730
            if is_account_policy else self.default_container_policy
1731
        for k in policy.keys():
1732
            if policy[k] == '':
1733
                policy[k] = default_policy.get(k)
1755
    def _check_project(self, value):
1756
        # raise ValueError('Bad quota source policy')
1757
        pass
1758

  
1759
    def _check_policy(self, policy):
1734 1760
        for k, v in policy.iteritems():
1735
            if k == 'quota':
1761
            if k == QUOTA_POLICY:
1736 1762
                q = int(v)  # May raise ValueError.
1737 1763
                if q < 0:
1738 1764
                    raise ValueError
1739
            elif k == 'versioning':
1765
            elif k == VERSIONING_POLICY:
1740 1766
                if v not in ['auto', 'none']:
1741 1767
                    raise ValueError
1768
            elif k == PROJECT:
1769
                self._check_project(v)
1742 1770
            else:
1743 1771
                raise ValueError
1744 1772

  
1745
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1746
        default_policy = self.default_account_policy \
1747
            if is_account_policy else self.default_container_policy
1773
    def _get_default_policy(self, node=None, is_account_policy=True,
1774
                            default_project=None):
1775
        if is_account_policy:
1776
            default_policy = self.default_account_policy
1777
        else:
1778
            default_policy = self.default_container_policy
1779
            if default_project is None and node is not None:
1780
                # set container's account as the default quota source
1781
                default_project = self.node.node_get_parent_path(node)
1782
            default_policy[PROJECT] = default_project
1783
        return default_policy
1784

  
1785
    def _put_policy(self, node, policy, replace,
1786
                    is_account_policy=True, default_project=None,
1787
                    check=True):
1788
        default_policy = self._get_default_policy(node,
1789
                                                  is_account_policy,
1790
                                                  default_project)
1748 1791
        if replace:
1749 1792
            for k, v in default_policy.iteritems():
1750 1793
                if k not in policy:
1751 1794
                    policy[k] = v
1795
        if check:
1796
            self._check_policy(policy)
1797

  
1752 1798
        self.node.policy_set(node, policy)
1753 1799

  
1754
    def _get_policy(self, node, is_account_policy=True):
1755
        default_policy = self.default_account_policy \
1756
            if is_account_policy else self.default_container_policy
1800
    def _get_policy(self, node, is_account_policy=True,
1801
                    default_project=None):
1802
        default_policy = self._get_default_policy(node,
1803
                                                  is_account_policy,
1804
                                                  default_project)
1757 1805
        policy = default_policy.copy()
1758 1806
        policy.update(self.node.policy_get(node))
1759 1807
        return policy
1760 1808

  
1809
    def _get_project(self, node):
1810
        policy = self._get_policy(node, is_account_policy=False)
1811
        return policy[PROJECT]
1812

  
1761 1813
    def _apply_versioning(self, account, container, version_id,
1762 1814
                          update_statistics_ancestors_depth=None):
1763 1815
        """Delete the provided version if such is the policy.
......
1768 1820
            return 0
1769 1821
        path, node = self._lookup_container(account, container)
1770 1822
        versioning = self._get_policy(
1771
            node, is_account_policy=False)['versioning']
1823
            node, is_account_policy=False)[VERSIONING_POLICY]
1772 1824
        if versioning != 'auto':
1773 1825
            hash, size = self.node.version_remove(
1774 1826
                version_id, update_statistics_ancestors_depth)

Also available in: Unified diff