Revision 3248f20a

b/snf-pithos-app/pithos/api/dispatch.py
14 14
        return
15 15

  
16 16
    backend = get_backend()
17
    backend.pre_exec()
17 18

  
18 19
    path = m['value']
19 20
    account, container, name = path.split('/', 2)
......
32 33
    except Exception, e:
33 34
        print 'WARNING: Can not update checksum for path "%s" (%s)' % (path, e)
34 35

  
36
    backend.post_exec()
35 37
    backend.close()
36 38

  
37 39

  
b/snf-pithos-app/pithos/api/manage_accounts/__init__.py
31 31
# interpreted as representing official policies, either expressed
32 32
# or implied, of GRNET S.A.
33 33

  
34
from pithos.api.util import (
35
    get_backend, split_container_object_string, Checksum, NoChecksum
36
)
34
from pithos.api.util import (get_backend, split_container_object_string,
35
                             Checksum, NoChecksum)
37 36
import re
38 37
import os
39 38

  
39
from functools import wraps
40

  
40 41

  
41 42
def data_read_iterator(str, size=1024):
42 43
    offset = 0
......
48 49
        yield data
49 50

  
50 51

  
52
def manage_transactions(lock_container_path=False):
53
    """Decorator function for ManageAccounts methods."""
54
    def decorator(func):
55
        @wraps(func)
56
        def wrapper(self, *args, **kwargs):
57
            self.backend.pre_exec(lock_container_path)
58
            try:
59
                result = func(self, *args, **kwargs)
60
            except:
61
                self.backend.post_exec(False)
62
                raise
63
            else:
64
                dry = kwargs.get('dry', False)
65
                if dry:
66
                    self.backend.post_exec(False)
67
                else:
68
                    self.backend.post_exec(True)
69
                return result
70
        return wrapper
71
    return decorator
72

  
73

  
51 74
class ManageAccounts():
52 75
    def __init__(self):
53 76
        self.backend = get_backend()
......
55 78
    def cleanup(self):
56 79
        self.backend.close()
57 80

  
81
    def _existing_accounts(self):
82
        l = sorted([path for path, _ in self.backend.node.node_accounts()])
83
        return l
84

  
85
    @manage_transactions()
58 86
    def existing_accounts(self):
59
        return sorted([path for path, _ in self.backend.node.node_accounts()])
87
        return self._existing_accounts()
60 88

  
89
    @manage_transactions()
61 90
    def duplicate_accounts(self):
62
        accounts = self.existing_accounts()
91
        accounts = self._existing_accounts()
63 92
        duplicates = []
64 93
        for i in range(len(accounts)):
65 94
            account = accounts[i]
66 95
            matcher = re.compile(account, re.IGNORECASE)
67
            duplicate = filter(matcher.match, (i for i in accounts[i + 1:] \
68
                if len(i) == len(account)))
96
            duplicate = filter(matcher.match, (i for i in accounts[i + 1:] if
97
                                               len(i) == len(account)))
69 98
            if duplicate:
70 99
                duplicate.insert(0, account)
71 100
                duplicates.append(duplicate)
72 101
        return duplicates
73 102

  
74
    def list_all_containers(self, account, step=10):
103
    def _list_all_containers(self, account, step=10):
75 104
        containers = []
76 105
        marker = None
77 106
        while 1:
......
83 112
            marker = more[-1]
84 113
        return containers
85 114

  
86
    def list_all_container_objects(self, account, container, virtual=False):
115
    @manage_transactions()
116
    def list_all_containers(self, account, step=10):
117
        return self._list_all_containers(account, step)
118

  
119
    def _list_all_container_objects(self, account, container, virtual=False):
87 120
        objects = []
88 121
        marker = None
89 122
        while 1:
......
95 128
            marker = more[-1][0]
96 129
        return objects
97 130

  
98
    def list_all_objects(self, account, virtual=False):
99
        containers = self.list_all_containers(account)
131
    @manage_transactions()
132
    def list_all_container_objects(self, account, container, virtual=False):
133
        return self._list_all_container_objects(account, container, virtual)
134

  
135
    def _list_all_objects(self, account, virtual=False):
136
        containers = self._list_all_containers(account)
100 137
        objects = []
101 138
        extend = objects.extend
102 139
        for c in containers:
103
            more = self.list_all_container_objects(account, c, virtual=virtual)
140
            more = self._list_all_container_objects(account, c,
141
                                                    virtual=virtual)
104 142
            extend([os.path.join(c, i) for i in more])
105 143
        return objects
106 144

  
107
    def list_past_versions(self, account, container, name):
145
    @manage_transactions()
146
    def list_all_objects(self, account, virtual=False):
147
        return self._list_all_objects(account, virtual)
148

  
149
    def _list_past_versions(self, account, container, name):
108 150
        versions = self.backend.list_versions(account, account, container,
109 151
                                              name)
110 152
        # do not return the current version
111 153
        return list(x[0] for x in versions[:-1])
112 154

  
113
    def move_object(self, src_account, src_container, src_name,
114
                    dest_account, dry=True, silent=False):
115
        if src_account not in self.existing_accounts():
155
    @manage_transactions()
156
    def list_past_versions(self, account, container, name):
157
        return self._list_past_versions(account, container, name)
158

  
159
    @manage_transactions(lock_container_path=True)
160
    def move_object(self, src_account, src_container, src_name, dest_account,
161
                    dry=True, silent=False):
162
        if src_account not in self._existing_accounts():
116 163
            raise NameError('%s does not exist' % src_account)
117
        if dest_account not in self.existing_accounts():
164
        if dest_account not in self._existing_accounts():
118 165
            raise NameError('%s does not exist' % dest_account)
119 166

  
120
        self.backend.wrapper.execute()
121
        try:
122
            self._copy_object(src_account, src_container, src_name,
123
                              dest_account, move=True)
124

  
167
        self._copy_object(src_account, src_container, src_name,
168
                          dest_account, move=True)
169
        if not silent:
125 170
            if dry:
126
                if not silent:
127
                    print "Skipping database commit."
128
                self.backend.wrapper.rollback()
171
                print "Database commit skipped."
129 172
            else:
130
                self.backend.wrapper.commit()
131
                if not silent:
132
                    print "%s is deleted." % src_account
133
        except:
134
            self.backend.wrapper.rollback()
135
            raise
173
                print "%s is deleted" % src_account
136 174

  
137 175
    def _copy_object(self, src_account, src_container, src_name,
138
                    dest_account, move=False):
176
                     dest_account, move=False):
139 177
        path = os.path.join(src_container, src_name)
140 178
        fullpath = os.path.join(src_account, path)
141 179
        dest_container = src_container
......
147 185
        content_type = meta.get('type')
148 186

  
149 187
        # get source object history
150
        versions = self.list_past_versions(src_account, src_container,
151
                                           src_name)
188
        versions = self._list_past_versions(src_account, src_container,
189
                                            src_name)
152 190

  
153 191
        # get source object permissions
154 192
        permissions = self.backend.permissions.access_get(fullpath)
......
203 241
    def _merge_account(self, src_account, dest_account, delete_src=False):
204 242
            # TODO: handle exceptions
205 243
            # copy all source objects
206
            for path in self.list_all_objects(src_account):
244
            for path in self._list_all_objects(src_account):
207 245
                src_container, src_name = split_container_object_string(
208 246
                    '/%s' % path)
209 247

  
......
221 259
                                                       permissions)
222 260

  
223 261
                self._copy_object(src_account, src_container, src_name,
224
                                 dest_account, move=delete_src)
262
                                  dest_account, move=delete_src)
225 263

  
226 264
            # move groups also
227 265
            groups = self.backend.get_account_groups(src_account, src_account)
......
231 269
            if delete_src:
232 270
                self._delete_account(src_account)
233 271

  
272
    @manage_transactions(lock_container_path=True)
234 273
    def merge_account(self, src_account, dest_account, only_stats=True,
235 274
                      dry=True, silent=False, delete_src=False):
236
        if src_account not in self.existing_accounts():
275
        if src_account not in self._existing_accounts():
237 276
            raise NameError('%s does not exist' % src_account)
238
        if dest_account not in self.existing_accounts():
277
        if dest_account not in self._existing_accounts():
239 278
            raise NameError('%s does not exist' % dest_account)
240 279

  
241 280
        if only_stats:
242 281
            print "The following %s's entries will be moved to %s:" \
243 282
                % (src_account, dest_account)
244
            print "Objects: %r" % self.list_all_objects(src_account)
283
            print "Objects: %r" % self._list_all_objects(src_account)
245 284
            print "Groups: %r" \
246 285
                % self.backend.get_account_groups(src_account,
247 286
                                                  src_account).keys()
248 287
            return
288
        self._merge_account(src_account, dest_account, delete_src)
249 289

  
250
        self.backend.wrapper.execute()
251
        try:
252
            self._merge_account(src_account, dest_account, delete_src)
253

  
290
        if not silent:
254 291
            if dry:
255
                if not silent:
256
                    print "Skipping database commit."
257
                self.backend.wrapper.rollback()
292
                print "Database commit skipped."
258 293
            else:
259
                self.backend.wrapper.commit()
260
                if not silent:
261
                    msg = "%s merged into %s."
262
                    print msg % (src_account, dest_account)
263
        except:
264
            self.backend.wrapper.rollback()
265
            raise
294
                print "%s has been merged into %s." % (src_account,
295
                                                       dest_account)
266 296

  
267
    def delete_container_contents(self, account, container):
297
    def _delete_container_contents(self, account, container):
268 298
        self.backend.delete_container(account, account, container,
269 299
                                      delimiter='/')
270 300

  
271
    def delete_container(self, account, container):
301
    @manage_transactions(lock_container_path=True)
302
    def delete_container_contents(self, account, container):
303
        return self._delete_container(account, account, container,
304
                                      delimiter='/')
305

  
306
    def _delete_container(self, account, container):
272 307
        self.backend.delete_container(account, account, container)
273 308

  
309
    @manage_transactions(lock_container_path=True)
310
    def delete_container(self, account, container):
311
        self._delete_container(account, account, container)
312

  
274 313
    def _delete_account(self, account):
275
        for c in self.list_all_containers(account):
276
            self.delete_container_contents(account, c)
277
            self.delete_container(account, c)
314
        for c in self._list_all_containers(account):
315
            self._delete_container_contents(account, c)
316
            self._delete_container(account, c)
278 317
        self.backend.delete_account(account, account)
279 318

  
319
    @manage_transactions(lock_container_path=True)
280 320
    def delete_account(self, account, only_stats=True, dry=True, silent=False):
281
        if account not in self.existing_accounts():
321
        if account not in self._existing_accounts():
282 322
            raise NameError('%s does not exist' % account)
283 323
        if only_stats:
284 324
            print "The following %s's entries will be removed:" % account
285
            print "Objects: %r" % self.list_all_objects(account)
325
            print "Objects: %r" % self._list_all_objects(account)
286 326
            print "Groups: %r" \
287 327
                % self.backend.get_account_groups(account, account).keys()
288 328
            return
329
        self._delete_account(account)
289 330

  
290
        self.backend.wrapper.execute()
291
        try:
292
            self._delete_account(account)
293

  
331
        if not silent:
294 332
            if dry:
295
                if not silent:
296
                    print "Skipping database commit."
297
                self.backend.wrapper.rollback()
333
                print "Database commit skipped."
298 334
            else:
299
                self.commit()
300
                if not silent:
301
                    print "%s is deleted." % account
302
        except:
303
            self.rollback()
304
            raise
335
                print "%s has been deleted." % account
305 336

  
337
    @manage_transactions(lock_container_path=True)
306 338
    def create_account(self, account):
307 339
        return self.backend._lookup_account(account, create=True)
308 340

  
341
    @manage_transactions(lock_container_path=True)
309 342
    def create_update_object(self, account, container, name, content_type,
310 343
                             data, meta=None, permissions=None,
311 344
                             request_user=None,
......
313 346
        meta = meta or {}
314 347
        permissions = permissions or {}
315 348

  
316
        assert checksum_compute_class in (NoChecksum, Checksum), 'Invalid checksum_compute_class'
349
        assert checksum_compute_class in (
350
            NoChecksum, Checksum), 'Invalid checksum_compute_class'
317 351
        checksum_compute = checksum_compute_class()
318 352
        size = 0
319 353
        hashmap = []
b/snf-pithos-app/pithos/api/management/commands/reconcile-commissions-pithos.py
42 42

  
43 43
CLIENTKEY = 'pithos'
44 44

  
45

  
45 46
class Command(NoArgsCommand):
46 47
    help = "Display unresolved commissions and trigger their recovery"
47 48

  
......
56 57
    def handle_noargs(self, **options):
57 58
        b = get_backend()
58 59
        try:
60
            b.pre_exec()
59 61
            pending_commissions = b.astakosclient.get_pending_commissions(
60 62
                token=b.service_token)
61 63

  
......
64 66
                    "Unresolved commissions: %s\n" % pending_commissions
65 67
                )
66 68
            else:
67
                self.stdout.write( "No unresolved commissions were found\n")
69
                self.stdout.write("No unresolved commissions were found\n")
68 70
                return
69 71

  
70 72
            if options['fix']:
......
78 80
                accepted = response['accepted']
79 81
                rejected = response['rejected']
80 82
                failed = response['failed']
81
                self.stdout.write("Accepted commissions: %s\n" %  accepted)
82
                self.stdout.write("Rejected commissions: %s\n" %  rejected)
83
                self.stdout.write("Accepted commissions: %s\n" % accepted)
84
                self.stdout.write("Rejected commissions: %s\n" % rejected)
83 85
                self.stdout.write("Failed commissions:\n")
84 86
                for i in failed:
85 87
                    self.stdout.write('%s\n' % i)
......
87 89
                b.commission_serials.delete_many(accepted)
88 90
        except Exception, e:
89 91
            logger.exception(e)
92
            b.post_exec(False)
90 93
            raise CommandError(e)
94
        else:
95
            b.post_exec(True)
91 96
        finally:
92 97
            b.close()
b/snf-pithos-app/pithos/api/management/commands/reconcile-resources-pithos.py
70 70

  
71 71
    def handle_noargs(self, **options):
72 72
        try:
73
            backend.pre_exec()
73 74
            qh_result = backend.astakosclient.service_get_quotas(
74 75
                backend.service_token)
75 76

  
......
142 143
                            "Reconciling failed because a limit has been "
143 144
                            "reached. Use --force to ignore the check.\n")
144 145
                        return
146
                    self.stdout.write("Fixed unsynced resources\n")
145 147

  
146 148
            if pending_exists:
147 149
                self.stdout.write(
......
150 152
                )
151 153
            elif not (unsynced or unknown_user_exists):
152 154
                self.stdout.write("Everything in sync.\n")
155
        except:
156
            backend.post_exec(False)
157
        else:
158
            backend.post_exec(True)
153 159
        finally:
154 160
            backend.close()
155 161

  
b/snf-pithos-app/pithos/api/util.py
1089 1089
            try:
1090 1090
                # Add a PithosBackend as attribute of the request object
1091 1091
                request.backend = get_backend()
1092
                request.backend.lock_container_path = lock_container_path
1093
                request.backend.pre_exec()
1092
                request.backend.pre_exec(lock_container_path)
1094 1093

  
1095 1094
                # Many API method expect thet X-Auth-Token in request,token
1096 1095
                request.token = request.x_auth_token
b/snf-pithos-backend/pithos/backends/modular.py
228 228
        self.serials = []
229 229
        self.messages = []
230 230

  
231
    def pre_exec(self):
231
    def pre_exec(self, lock_container_path=False):
232
        self.lock_container_path = lock_container_path
232 233
        self.wrapper.execute()
233 234

  
234 235
    def post_exec(self, success_status=True):

Also available in: Unified diff