Revision 9f135224

b/snf-pithos-app/pithos/api/util.py
1084 1084
                # Add a PithosBackend as attribute of the request object
1085 1085
                request.backend = get_backend()
1086 1086
                request.backend.lock_container_path = lock_container_path
1087
                request.backend.wrapper.execute()
1088
                request.backend.serials = []
1089
                request.backend.messages = []
1090

  
1087 1091
                # Many API method expect thet X-Auth-Token in request,token
1088 1092
                request.token = request.x_auth_token
1089 1093
                update_request_headers(request)
1090 1094
                response = func(request, *args, **kwargs)
1091 1095
                update_response_headers(request, response)
1096

  
1097
                # send messages produced
1098
                for m in request.backend.messages:
1099
                    request.backend.queue.send(*m)
1100

  
1101
                # register serials
1102
                if request.backend.serials:
1103
                    request.backend.commission_serials.insert_many(
1104
                        request.backend.serials)
1105

  
1106
                    # commit to ensure that the serials are registered
1107
                    # even if resolve commission fails
1108
                    request.backend.wrapper.commit()
1109

  
1110
                    # start new transaction
1111
                    request.backend.wrapper.execute()
1112

  
1113
                    r = request.backend.astakosclient.resolve_commissions(
1114
                                token=request.backend.service_token,
1115
                                accept_serials=request.backend.serials,
1116
                                reject_serials=[])
1117
                    request.backend.commission_serials.delete_many(
1118
                        r['accepted'])
1119

  
1120
                request.backend.wrapper.commit()
1092 1121
                return response
1122
            except:
1123
                if request.backend.serials:
1124
                    request.backend.astakosclient.resolve_commissions(
1125
                        token=request.backend.service_token,
1126
                        accept_serials=[],
1127
                        reject_serials=request.backend.serials)
1128
                request.backend.wrapper.rollback()
1129
                raise
1093 1130
            finally:
1094 1131
                # Always close PithosBackend connection
1095 1132
                if getattr(request, "backend", None) is not None:
b/snf-pithos-backend/pithos/backends/modular.py
118 118
logger = logging.getLogger(__name__)
119 119

  
120 120

  
121
def backend_method(func=None, autocommit=1):
122
    if func is None:
123
        def fn(func):
124
            return backend_method(func, autocommit)
125
        return fn
126

  
127
    if not autocommit:
128
        return func
129

  
130
    def fn(self, *args, **kw):
131
        self.wrapper.execute()
132
        serials = []
133
        self.serials = serials
134
        self.messages = []
135

  
136
        try:
137
            ret = func(self, *args, **kw)
138
            for m in self.messages:
139
                self.queue.send(*m)
140
            if self.serials:
141
                self.commission_serials.insert_many(self.serials)
142

  
143
                # commit to ensure that the serials are registered
144
                # even if accept commission fails
145
                self.wrapper.commit()
146
                self.wrapper.execute()
147

  
148
                r = self.astakosclient.resolve_commissions(
149
                            token=self.service_token,
150
                            accept_serials=self.serials,
151
                            reject_serials=[])
152
                self.commission_serials.delete_many(r['accepted'])
153

  
154
            self.wrapper.commit()
155
            return ret
156
        except:
157
            if self.serials:
158
                self.astakosclient.resolve_commissions(
159
                    token=self.service_token,
160
                    accept_serials=[],
161
                    reject_serials=self.serials)
162
            self.wrapper.rollback()
163
            raise
164
    return fn
165

  
166

  
167 121
class ModularBackend(BaseBackend):
168 122
    """A modular backend.
169 123

  
......
275 229
    def using_external_quotaholder(self):
276 230
        return not isinstance(self.astakosclient, DisabledAstakosClient)
277 231

  
278
    @backend_method
279 232
    def list_accounts(self, user, marker=None, limit=10000):
280 233
        """Return a list of accounts the user can access."""
281 234

  
......
284 237
        start, limit = self._list_limits(allowed, marker, limit)
285 238
        return allowed[start:start + limit]
286 239

  
287
    @backend_method
288 240
    def get_account_meta(
289 241
            self, user, account, domain, until=None, include_user_defined=True,
290 242
            external_quota=None):
......
327 279
        meta.update({'modified': modified})
328 280
        return meta
329 281

  
330
    @backend_method
331 282
    def update_account_meta(self, user, account, domain, meta, replace=False):
332 283
        """Update the metadata associated with the account for the domain."""
333 284

  
......
339 290
        self._put_metadata(user, node, domain, meta, replace,
340 291
                           update_statistics_ancestors_depth=-1)
341 292

  
342
    @backend_method
343 293
    def get_account_groups(self, user, account):
344 294
        """Return a dictionary with the user groups defined for this account."""
345 295

  
......
351 301
        self._lookup_account(account, True)
352 302
        return self.permissions.group_dict(account)
353 303

  
354
    @backend_method
355 304
    def update_account_groups(self, user, account, groups, replace=False):
356 305
        """Update the groups associated with the account."""
357 306

  
......
369 318
            if v:
370 319
                self.permissions.group_addmany(account, k, v)
371 320

  
372
    @backend_method
373 321
    def get_account_policy(self, user, account, external_quota=None):
374 322
        """Return a dictionary with the account policy."""
375 323

  
......
385 333
            policy['quota'] = external_quota.get('limit', 0)
386 334
        return policy
387 335

  
388
    @backend_method
389 336
    def update_account_policy(self, user, account, policy, replace=False):
390 337
        """Update the policy associated with the account."""
391 338

  
......
397 344
        self._check_policy(policy, is_account_policy=True)
398 345
        self._put_policy(node, policy, replace, is_account_policy=True)
399 346

  
400
    @backend_method
401 347
    def put_account(self, user, account, policy=None):
402 348
        """Create a new account with the given name."""
403 349

  
......
414 360
                              update_statistics_ancestors_depth=-1)
415 361
        self._put_policy(node, policy, True, is_account_policy=True)
416 362

  
417
    @backend_method
418 363
    def delete_account(self, user, account):
419 364
        """Delete the account with the given name."""
420 365

  
......
429 374
            raise AccountNotEmpty('Account is not empty')
430 375
        self.permissions.group_destroy(account)
431 376

  
432
    @backend_method
433 377
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
434 378
        """Return a list of containers existing under an account."""
435 379

  
......
457 401
            [x[0] for x in containers], marker, limit)
458 402
        return containers[start:start + limit]
459 403

  
460
    @backend_method
461 404
    def list_container_meta(self, user, account, container, domain, until=None):
462 405
        """Return a list with all the container's object meta keys for the domain."""
463 406

  
......
476 419
        allowed = self._get_formatted_paths(allowed)
477 420
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
478 421

  
479
    @backend_method
480 422
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
481 423
        """Return a dictionary with the container metadata for the domain."""
482 424

  
......
510 452
        meta.update({'modified': modified})
511 453
        return meta
512 454

  
513
    @backend_method
514 455
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
515 456
        """Update the metadata associated with the container for the domain."""
516 457

  
......
529 470
                self.node.version_remove(src_version_id,
530 471
                                         update_statistics_ancestors_depth=0)
531 472

  
532
    @backend_method
533 473
    def get_container_policy(self, user, account, container):
534 474
        """Return a dictionary with the container policy."""
535 475

  
......
542 482
        path, node = self._lookup_container(account, container)
543 483
        return self._get_policy(node, is_account_policy=False)
544 484

  
545
    @backend_method
546 485
    def update_container_policy(self, user, account, container, policy, replace=False):
547 486
        """Update the policy associated with the container."""
548 487

  
......
554 493
        self._check_policy(policy, is_account_policy=False)
555 494
        self._put_policy(node, policy, replace, is_account_policy=False)
556 495

  
557
    @backend_method
558 496
    def put_container(self, user, account, container, policy=None):
559 497
        """Create a new container with the given name."""
560 498

  
......
577 515
            update_statistics_ancestors_depth=-1)
578 516
        self._put_policy(node, policy, True, is_account_policy=False)
579 517

  
580
    @backend_method
581 518
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
582 519
        """Delete/purge the container with the given name."""
583 520

  
......
731 668
                return []
732 669
        return allowed
733 670

  
734
    @backend_method
735 671
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
736 672
        """Return a list of object (name, version_id) tuples existing under a container."""
737 673

  
......
739 675
        keys = keys or []
740 676
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
741 677

  
742
    @backend_method
743 678
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
744 679
        """Return a list of object metadata dicts existing under a container."""
745 680

  
......
763 698
                                'checksum': p[self.CHECKSUM + 1]})
764 699
        return objects
765 700

  
766
    @backend_method
767 701
    def list_object_permissions(self, user, account, container, prefix=''):
768 702
        """Return a list of paths that enforce permissions under a container."""
769 703

  
......
771 705
                     account, container, prefix)
772 706
        return self._list_object_permissions(user, account, container, prefix, True, False)
773 707

  
774
    @backend_method
775 708
    def list_object_public(self, user, account, container, prefix=''):
776 709
        """Return a dict mapping paths to public ids for objects that are public under a container."""
777 710

  
......
782 715
            public[path] = p
783 716
        return public
784 717

  
785
    @backend_method
786 718
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
787 719
        """Return a dictionary with the object metadata for the domain."""
788 720

  
......
820 752
                     'checksum': props[self.CHECKSUM]})
821 753
        return meta
822 754

  
823
    @backend_method
824 755
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
825 756
        """Update the metadata associated with the object for the domain and return the new version."""
826 757

  
......
835 766
                               update_statistics_ancestors_depth=1)
836 767
        return dest_version_id
837 768

  
838
    @backend_method
839 769
    def get_object_permissions(self, user, account, container, name):
840 770
        """Return the action allowed on the object, the path
841 771
        from which the object gets its permissions from,
......
855 785
        self._lookup_object(account, container, name)
856 786
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
857 787

  
858
    @backend_method
859 788
    def update_object_permissions(self, user, account, container, name, permissions):
860 789
        """Update the permissions associated with the object."""
861 790

  
......
869 798
        self._report_sharing_change(user, account, path, {'members':
870 799
                                    self.permissions.access_members(path)})
871 800

  
872
    @backend_method
873 801
    def get_object_public(self, user, account, container, name):
874 802
        """Return the public id of the object if applicable."""
875 803

  
......
880 808
        p = self.permissions.public_get(path)
881 809
        return p
882 810

  
883
    @backend_method
884 811
    def update_object_public(self, user, account, container, name, public):
885 812
        """Update the public status of the object."""
886 813

  
......
895 822
                path, self.public_url_security, self.public_url_alphabet
896 823
            )
897 824

  
898
    @backend_method
899 825
    def get_object_hashmap(self, user, account, container, name, version=None):
900 826
        """Return the object's size and a list with partial hashes."""
901 827

  
......
975 901
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
976 902
        return dest_version_id
977 903

  
978
    @backend_method
979 904
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta=None, replace_meta=False, permissions=None):
980 905
        """Create/update an object with the specified size and partial hashes."""
981 906

  
......
997 922
        self.store.map_put(hash, map)
998 923
        return dest_version_id
999 924

  
1000
    @backend_method
1001 925
    def update_object_checksum(self, user, account, container, name, version, checksum):
1002 926
        """Update an object's checksum."""
1003 927

  
......
1053 977
                    self._delete_object(user, src_account, src_container, path)
1054 978
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
1055 979

  
1056
    @backend_method
1057 980
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, src_version=None, delimiter=None):
1058 981
        """Copy an object's data and metadata."""
1059 982

  
......
1062 985
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
1063 986
        return dest_version_id
1064 987

  
1065
    @backend_method
1066 988
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, delimiter=None):
1067 989
        """Move an object's data and metadata."""
1068 990

  
......
1149 1071
                paths.append(path)
1150 1072
            self.permissions.access_clear_bulk(paths)
1151 1073

  
1152
    @backend_method
1153 1074
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1154 1075
        """Delete/purge an object."""
1155 1076

  
......
1157 1078
                     account, container, name, until, prefix, delimiter)
1158 1079
        self._delete_object(user, account, container, name, until, delimiter)
1159 1080

  
1160
    @backend_method
1161 1081
    def list_versions(self, user, account, container, name):
1162 1082
        """Return a list of all (version, version_timestamp) tuples for an object."""
1163 1083

  
......
1168 1088
        versions = self.node.node_get_versions(node)
1169 1089
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1170 1090

  
1171
    @backend_method
1172 1091
    def get_uuid(self, user, uuid):
1173 1092
        """Return the (account, container, name) for the UUID given."""
1174 1093

  
......
1181 1100
        self._can_read(user, account, container, name)
1182 1101
        return (account, container, name)
1183 1102

  
1184
    @backend_method
1185 1103
    def get_public(self, user, public):
1186 1104
        """Return the (account, container, name) for the public id given."""
1187 1105

  
......
1193 1111
        self._can_read(user, account, container, name)
1194 1112
        return (account, container, name)
1195 1113

  
1196
    @backend_method(autocommit=0)
1197 1114
    def get_block(self, hash):
1198 1115
        """Return a block's data."""
1199 1116

  
......
1203 1120
            raise ItemNotExists('Block does not exist')
1204 1121
        return block
1205 1122

  
1206
    @backend_method(autocommit=0)
1207 1123
    def put_block(self, data):
1208 1124
        """Store a block and return the hash."""
1209 1125

  
1210 1126
        logger.debug("put_block: %s", len(data))
1211 1127
        return binascii.hexlify(self.store.block_put(data))
1212 1128

  
1213
    @backend_method(autocommit=0)
1214 1129
    def update_block(self, hash, data, offset=0):
1215 1130
        """Update a known block and return the hash."""
1216 1131

  
......
1592 1507

  
1593 1508
    # Domain functions
1594 1509

  
1595
    @backend_method
1596 1510
    def get_domain_objects(self, domain, user=None):
1597 1511
        obj_list = self.node.domain_object_list(domain, CLUSTER_NORMAL)
1598 1512
        if user != None:

Also available in: Unified diff