Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 90ee1eb3

History | View | Annotate | Download (50.8 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
43

    
44
# Stripped-down version of the HashMap class found in tools.
45
class HashMap(list):
46

    
47
    def __init__(self, blocksize, blockhash):
48
        super(HashMap, self).__init__()
49
        self.blocksize = blocksize
50
        self.blockhash = blockhash
51

    
52
    def _hash_raw(self, v):
53
        h = hashlib.new(self.blockhash)
54
        h.update(v)
55
        return h.digest()
56

    
57
    def hash(self):
58
        if len(self) == 0:
59
            return self._hash_raw('')
60
        if len(self) == 1:
61
            return self.__getitem__(0)
62

    
63
        h = list(self)
64
        s = 2
65
        while s < len(h):
66
            s = s * 2
67
        h += [('\x00' * len(h[0]))] * (s - len(h))
68
        while len(h) > 1:
69
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
70
        return h[0]
71

    
72
# Default modules and settings.
73
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
74
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
75
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
76
DEFAULT_BLOCK_PATH = 'data/'
77
DEFAULT_BLOCK_UMASK = 0o022
78
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
79
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
80

    
81
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
82
QUEUE_CLIENT_ID = 'pithos'
83
QUEUE_INSTANCE_ID = '1'
84

    
85
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
86

    
87
inf = float('inf')
88

    
89
ULTIMATE_ANSWER = 42
90

    
91

    
92
logger = logging.getLogger(__name__)
93

    
94

    
95
def backend_method(func=None, autocommit=1):
96
    if func is None:
97
        def fn(func):
98
            return backend_method(func, autocommit)
99
        return fn
100

    
101
    if not autocommit:
102
        return func
103
    def fn(self, *args, **kw):
104
        self.wrapper.execute()
105
        try:
106
            self.messages = []
107
            ret = func(self, *args, **kw)
108
            for m in self.messages:
109
                self.queue.send(*m)
110
            self.wrapper.commit()
111
            return ret
112
        except:
113
            self.wrapper.rollback()
114
            raise
115
    return fn
116

    
117

    
118
class ModularBackend(BaseBackend):
119
    """A modular backend.
120
    
121
    Uses modules for SQL functions and storage.
122
    """
123
    
124
    def __init__(self, db_module=None, db_connection=None,
125
                 block_module=None, block_path=None, block_umask=None,
126
                 queue_module=None, queue_connection=None):
127
        db_module = db_module or DEFAULT_DB_MODULE
128
        db_connection = db_connection or DEFAULT_DB_CONNECTION
129
        block_module = block_module or DEFAULT_BLOCK_MODULE
130
        block_path = block_path or DEFAULT_BLOCK_PATH
131
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
132
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
133
        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
134
        
135
        self.hash_algorithm = 'sha256'
136
        self.block_size = 4 * 1024 * 1024 # 4MB
137
        
138
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
139
        
140
        def load_module(m):
141
            __import__(m)
142
            return sys.modules[m]
143
        
144
        self.db_module = load_module(db_module)
145
        self.wrapper = self.db_module.DBWrapper(db_connection)
146
        params = {'wrapper': self.wrapper}
147
        self.permissions = self.db_module.Permissions(**params)
148
        for x in ['READ', 'WRITE']:
149
            setattr(self, x, getattr(self.db_module, x))
150
        self.node = self.db_module.Node(**params)
151
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
152
            setattr(self, x, getattr(self.db_module, x))
153
        
154
        self.block_module = load_module(block_module)
155
        params = {'path': block_path,
156
                  'block_size': self.block_size,
157
                  'hash_algorithm': self.hash_algorithm,
158
                  'umask': block_umask}
159
        self.store = self.block_module.Store(**params)
160

    
161
        if queue_module and queue_connection:
162
            self.queue_module = load_module(queue_module)
163
            params = {'exchange': queue_connection,
164
                      'client_id': QUEUE_CLIENT_ID}
165
            self.queue = self.queue_module.Queue(**params)
166
        else:
167
            class NoQueue:
168
                def send(self, *args):
169
                    pass
170
                
171
                def close(self):
172
                    pass
173
            
174
            self.queue = NoQueue()
175
    
176
    def close(self):
177
        self.wrapper.close()
178
        self.queue.close()
179
    
180
    @backend_method
181
    def list_accounts(self, user, marker=None, limit=10000):
182
        """Return a list of accounts the user can access."""
183
        
184
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
185
        allowed = self._allowed_accounts(user)
186
        start, limit = self._list_limits(allowed, marker, limit)
187
        return allowed[start:start + limit]
188
    
189
    @backend_method
190
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
191
        """Return a dictionary with the account metadata for the domain."""
192
        
193
        logger.debug("get_account_meta: %s %s %s", account, domain, until)
194
        path, node = self._lookup_account(account, user == account)
195
        if user != account:
196
            if until or node is None or account not in self._allowed_accounts(user):
197
                raise NotAllowedError
198
        try:
199
            props = self._get_properties(node, until)
200
            mtime = props[self.MTIME]
201
        except NameError:
202
            props = None
203
            mtime = until
204
        count, bytes, tstamp = self._get_statistics(node, until)
205
        tstamp = max(tstamp, mtime)
206
        if until is None:
207
            modified = tstamp
208
        else:
209
            modified = self._get_statistics(node)[2] # Overall last modification.
210
            modified = max(modified, mtime)
211
        
212
        if user != account:
213
            meta = {'name': account}
214
        else:
215
            meta = {}
216
            if props is not None and include_user_defined:
217
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
218
            if until is not None:
219
                meta.update({'until_timestamp': tstamp})
220
            meta.update({'name': account, 'count': count, 'bytes': bytes})
221
        meta.update({'modified': modified})
222
        return meta
223
    
224
    @backend_method
225
    def update_account_meta(self, user, account, domain, meta, replace=False):
226
        """Update the metadata associated with the account for the domain."""
227
        
228
        logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
229
        if user != account:
230
            raise NotAllowedError
231
        path, node = self._lookup_account(account, True)
232
        self._put_metadata(user, node, domain, meta, replace)
233
    
234
    @backend_method
235
    def get_account_groups(self, user, account):
236
        """Return a dictionary with the user groups defined for this account."""
237
        
238
        logger.debug("get_account_groups: %s", account)
239
        if user != account:
240
            if account not in self._allowed_accounts(user):
241
                raise NotAllowedError
242
            return {}
243
        self._lookup_account(account, True)
244
        return self.permissions.group_dict(account)
245
    
246
    @backend_method
247
    def update_account_groups(self, user, account, groups, replace=False):
248
        """Update the groups associated with the account."""
249
        
250
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
251
        if user != account:
252
            raise NotAllowedError
253
        self._lookup_account(account, True)
254
        self._check_groups(groups)
255
        if replace:
256
            self.permissions.group_destroy(account)
257
        for k, v in groups.iteritems():
258
            if not replace: # If not already deleted.
259
                self.permissions.group_delete(account, k)
260
            if v:
261
                self.permissions.group_addmany(account, k, v)
262
    
263
    @backend_method
264
    def get_account_policy(self, user, account):
265
        """Return a dictionary with the account policy."""
266
        
267
        logger.debug("get_account_policy: %s", account)
268
        if user != account:
269
            if account not in self._allowed_accounts(user):
270
                raise NotAllowedError
271
            return {}
272
        path, node = self._lookup_account(account, True)
273
        return self._get_policy(node)
274
    
275
    @backend_method
276
    def update_account_policy(self, user, account, policy, replace=False):
277
        """Update the policy associated with the account."""
278
        
279
        logger.debug("update_account_policy: %s %s %s", account, policy, replace)
280
        if user != account:
281
            raise NotAllowedError
282
        path, node = self._lookup_account(account, True)
283
        self._check_policy(policy)
284
        self._put_policy(node, policy, replace)
285
    
286
    @backend_method
287
    def put_account(self, user, account, policy={}):
288
        """Create a new account with the given name."""
289
        
290
        logger.debug("put_account: %s %s", account, policy)
291
        if user != account:
292
            raise NotAllowedError
293
        node = self.node.node_lookup(account)
294
        if node is not None:
295
            raise NameError('Account already exists')
296
        if policy:
297
            self._check_policy(policy)
298
        node = self._put_path(user, self.ROOTNODE, account)
299
        self._put_policy(node, policy, True)
300
    
301
    @backend_method
302
    def delete_account(self, user, account):
303
        """Delete the account with the given name."""
304
        
305
        logger.debug("delete_account: %s", account)
306
        if user != account:
307
            raise NotAllowedError
308
        node = self.node.node_lookup(account)
309
        if node is None:
310
            return
311
        if not self.node.node_remove(node):
312
            raise IndexError('Account is not empty')
313
        self.permissions.group_destroy(account)
314
    
315
    @backend_method
316
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
317
        """Return a list of containers existing under an account."""
318
        
319
        logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
320
        if user != account:
321
            if until or account not in self._allowed_accounts(user):
322
                raise NotAllowedError
323
            allowed = self._allowed_containers(user, account)
324
            start, limit = self._list_limits(allowed, marker, limit)
325
            return allowed[start:start + limit]
326
        if shared or public:
327
            allowed = []
328
            if shared:
329
                allowed.extend([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
330
            if public:
331
                allowed.extend([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
332
            allowed = list(set(allowed))
333
            start, limit = self._list_limits(allowed, marker, limit)
334
            return allowed[start:start + limit]
335
        node = self.node.node_lookup(account)
336
        return [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
337
    
338
    @backend_method
339
    def list_container_meta(self, user, account, container, domain, until=None):
340
        """Return a list with all the container's object meta keys for the domain."""
341
        
342
        logger.debug("list_container_meta: %s %s %s %s", account, container, domain, until)
343
        allowed = []
344
        if user != account:
345
            if until:
346
                raise NotAllowedError
347
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
348
            if not allowed:
349
                raise NotAllowedError
350
        path, node = self._lookup_container(account, container)
351
        before = until if until is not None else inf
352
        allowed = self._get_formatted_paths(allowed)
353
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
354
    
355
    @backend_method
356
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
357
        """Return a dictionary with the container metadata for the domain."""
358
        
359
        logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
360
        if user != account:
361
            if until or container not in self._allowed_containers(user, account):
362
                raise NotAllowedError
363
        path, node = self._lookup_container(account, container)
364
        props = self._get_properties(node, until)
365
        mtime = props[self.MTIME]
366
        count, bytes, tstamp = self._get_statistics(node, until)
367
        tstamp = max(tstamp, mtime)
368
        if until is None:
369
            modified = tstamp
370
        else:
371
            modified = self._get_statistics(node)[2] # Overall last modification.
372
            modified = max(modified, mtime)
373
        
374
        if user != account:
375
            meta = {'name': container}
376
        else:
377
            meta = {}
378
            if include_user_defined:
379
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
380
            if until is not None:
381
                meta.update({'until_timestamp': tstamp})
382
            meta.update({'name': container, 'count': count, 'bytes': bytes})
383
        meta.update({'modified': modified})
384
        return meta
385
    
386
    @backend_method
387
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
388
        """Update the metadata associated with the container for the domain."""
389
        
390
        logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
391
        if user != account:
392
            raise NotAllowedError
393
        path, node = self._lookup_container(account, container)
394
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
395
        if src_version_id is not None:
396
            versioning = self._get_policy(node)['versioning']
397
            if versioning != 'auto':
398
                self.node.version_remove(src_version_id)
399
    
400
    @backend_method
401
    def get_container_policy(self, user, account, container):
402
        """Return a dictionary with the container policy."""
403
        
404
        logger.debug("get_container_policy: %s %s", account, container)
405
        if user != account:
406
            if container not in self._allowed_containers(user, account):
407
                raise NotAllowedError
408
            return {}
409
        path, node = self._lookup_container(account, container)
410
        return self._get_policy(node)
411
    
412
    @backend_method
413
    def update_container_policy(self, user, account, container, policy, replace=False):
414
        """Update the policy associated with the container."""
415
        
416
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
417
        if user != account:
418
            raise NotAllowedError
419
        path, node = self._lookup_container(account, container)
420
        self._check_policy(policy)
421
        self._put_policy(node, policy, replace)
422
    
423
    @backend_method
424
    def put_container(self, user, account, container, policy={}):
425
        """Create a new container with the given name."""
426
        
427
        logger.debug("put_container: %s %s %s", account, container, policy)
428
        if user != account:
429
            raise NotAllowedError
430
        try:
431
            path, node = self._lookup_container(account, container)
432
        except NameError:
433
            pass
434
        else:
435
            raise NameError('Container already exists')
436
        if policy:
437
            self._check_policy(policy)
438
        path = '/'.join((account, container))
439
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
440
        self._put_policy(node, policy, True)
441
    
442
    @backend_method
443
    def delete_container(self, user, account, container, until=None):
444
        """Delete/purge the container with the given name."""
445
        
446
        logger.debug("delete_container: %s %s %s", account, container, until)
447
        if user != account:
448
            raise NotAllowedError
449
        path, node = self._lookup_container(account, container)
450
        
451
        if until is not None:
452
            hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
453
            for h in hashes:
454
                self.store.map_delete(h)
455
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
456
            self._report_size_change(user, account, -size, {'action': 'container purge'})
457
            return
458
        
459
        if self._get_statistics(node)[0] > 0:
460
            raise IndexError('Container is not empty')
461
        hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
462
        for h in hashes:
463
            self.store.map_delete(h)
464
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
465
        self.node.node_remove(node)
466
        self._report_size_change(user, account, -size, {'action': 'container delete'})
467
    
468
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
469
        if user != account and until:
470
            raise NotAllowedError
471
        allowed = self._list_object_permissions(user, account, container, prefix, shared, public)
472
        if (shared or public) and not allowed:
473
            return []
474
        path, node = self._lookup_container(account, container)
475
        allowed = self._get_formatted_paths(allowed)
476
        return self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
477
    
478
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
479
        allowed = []
480
        path = '/'.join((account, container, prefix)).rstrip('/')
481
        if user != account:
482
            allowed = self.permissions.access_list_paths(user, path)
483
            if not allowed:
484
                raise NotAllowedError
485
        else:
486
            if shared:
487
                allowed = self.permissions.access_list_shared(path)
488
                allowed.extend([x[0] for x in self.permissions.public_list(path)])
489
                allowed = list(set(allowed))
490
                if not allowed:
491
                    return []
492
        return allowed
493
    
494
    @backend_method
495
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
496
        """Return a list of object (name, version_id) tuples existing under a container."""
497
        
498
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
499
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
500
    
501
    @backend_method
502
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
503
        """Return a list of object metadata dicts existing under a container."""
504
        
505
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
506
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
507
        objects = []
508
        for p in props:
509
            if len(p) == 2:
510
                objects.append({'subdir': p[0]})
511
            else:
512
                objects.append({'name': p[0],
513
                                'bytes': p[self.SIZE + 1],
514
                                'type': p[self.TYPE + 1],
515
                                'hash': p[self.HASH + 1],
516
                                'version': p[self.SERIAL + 1],
517
                                'version_timestamp': p[self.MTIME + 1],
518
                                'modified': p[self.MTIME + 1] if until is None else None,
519
                                'modified_by': p[self.MUSER + 1],
520
                                'uuid': p[self.UUID + 1],
521
                                'checksum': p[self.CHECKSUM + 1]})
522
        return objects
523
    
524
    @backend_method
525
    def list_object_permissions(self, user, account, container, prefix=''):
526
        """Return a list of paths that enforce permissions under a container."""
527
        
528
        logger.debug("list_object_permissions: %s %s %s", account, container, prefix)
529
        return self._list_object_permissions(user, account, container, prefix, True, False)
530
    
531
    @backend_method
532
    def list_object_public(self, user, account, container, prefix=''):
533
        """Return a dict mapping paths to public ids for objects that are public under a container."""
534
        
535
        logger.debug("list_object_public: %s %s %s", account, container, prefix)
536
        public = {}
537
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
538
            public[path] = p + ULTIMATE_ANSWER
539
        return public
540
    
541
    @backend_method
542
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
543
        """Return a dictionary with the object metadata for the domain."""
544
        
545
        logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
546
        self._can_read(user, account, container, name)
547
        path, node = self._lookup_object(account, container, name)
548
        props = self._get_version(node, version)
549
        if version is None:
550
            modified = props[self.MTIME]
551
        else:
552
            try:
553
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
554
            except NameError: # Object may be deleted.
555
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
556
                if del_props is None:
557
                    raise NameError('Object does not exist')
558
                modified = del_props[self.MTIME]
559
        
560
        meta = {}
561
        if include_user_defined:
562
            meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
563
        meta.update({'name': name,
564
                     'bytes': props[self.SIZE],
565
                     'type': props[self.TYPE],
566
                     'hash': props[self.HASH],
567
                     'version': props[self.SERIAL],
568
                     'version_timestamp': props[self.MTIME],
569
                     'modified': modified,
570
                     'modified_by': props[self.MUSER],
571
                     'uuid': props[self.UUID],
572
                     'checksum': props[self.CHECKSUM]})
573
        return meta
574
    
575
    @backend_method
576
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
577
        """Update the metadata associated with the object for the domain and return the new version."""
578
        
579
        logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
580
        self._can_write(user, account, container, name)
581
        path, node = self._lookup_object(account, container, name)
582
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
583
        self._apply_versioning(account, container, src_version_id)
584
        return dest_version_id
585
    
586
    @backend_method
587
    def get_object_permissions(self, user, account, container, name):
588
        """Return the action allowed on the object, the path
589
        from which the object gets its permissions from,
590
        along with a dictionary containing the permissions."""
591
        
592
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
593
        allowed = 'write'
594
        permissions_path = self._get_permissions_path(account, container, name)
595
        if user != account:
596
            if self.permissions.access_check(permissions_path, self.WRITE, user):
597
                allowed = 'write'
598
            elif self.permissions.access_check(permissions_path, self.READ, user):
599
                allowed = 'read'
600
            else:
601
                raise NotAllowedError
602
        self._lookup_object(account, container, name)
603
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
604
    
605
    @backend_method
606
    def update_object_permissions(self, user, account, container, name, permissions):
607
        """Update the permissions associated with the object."""
608
        
609
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
610
        if user != account:
611
            raise NotAllowedError
612
        path = self._lookup_object(account, container, name)[0]
613
        self._check_permissions(path, permissions)
614
        self.permissions.access_set(path, permissions)
615
        self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
616
    
617
    @backend_method
618
    def get_object_public(self, user, account, container, name):
619
        """Return the public id of the object if applicable."""
620
        
621
        logger.debug("get_object_public: %s %s %s", account, container, name)
622
        self._can_read(user, account, container, name)
623
        path = self._lookup_object(account, container, name)[0]
624
        p = self.permissions.public_get(path)
625
        if p is not None:
626
            p += ULTIMATE_ANSWER
627
        return p
628
    
629
    @backend_method
630
    def update_object_public(self, user, account, container, name, public):
631
        """Update the public status of the object."""
632
        
633
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
634
        self._can_write(user, account, container, name)
635
        path = self._lookup_object(account, container, name)[0]
636
        if not public:
637
            self.permissions.public_unset(path)
638
        else:
639
            self.permissions.public_set(path)
640
    
641
    @backend_method
642
    def get_object_hashmap(self, user, account, container, name, version=None):
643
        """Return the object's size and a list with partial hashes."""
644
        
645
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
646
        self._can_read(user, account, container, name)
647
        path, node = self._lookup_object(account, container, name)
648
        props = self._get_version(node, version)
649
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
650
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
651
    
652
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
653
        if permissions is not None and user != account:
654
            raise NotAllowedError
655
        self._can_write(user, account, container, name)
656
        if permissions is not None:
657
            path = '/'.join((account, container, name))
658
            self._check_permissions(path, permissions)
659
        
660
        account_path, account_node = self._lookup_account(account, True)
661
        container_path, container_node = self._lookup_container(account, container)
662
        path, node = self._put_object_node(container_path, container_node, name)
663
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
664
        
665
        # Handle meta.
666
        if src_version_id is None:
667
            src_version_id = pre_version_id
668
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
669
        
670
        # Check quota.
671
        del_size = self._apply_versioning(account, container, pre_version_id)
672
        size_delta = size - del_size
673
        if size_delta > 0:
674
            account_quota = long(self._get_policy(account_node)['quota'])
675
            container_quota = long(self._get_policy(container_node)['quota'])
676
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
677
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
678
                # This must be executed in a transaction, so the version is never created if it fails.
679
                raise QuotaError
680
        self._report_size_change(user, account, size_delta, {'action': 'object update'})
681
        
682
        if permissions is not None:
683
            self.permissions.access_set(path, permissions)
684
            self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
685
        
686
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
687
        return dest_version_id
688
    
689
    @backend_method
690
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
691
        """Create/update an object with the specified size and partial hashes."""
692
        
693
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s", account, container, name, size, type, hashmap, checksum)
694
        if size == 0: # No such thing as an empty hashmap.
695
            hashmap = [self.put_block('')]
696
        map = HashMap(self.block_size, self.hash_algorithm)
697
        map.extend([binascii.unhexlify(x) for x in hashmap])
698
        missing = self.store.block_search(map)
699
        if missing:
700
            ie = IndexError()
701
            ie.data = [binascii.hexlify(x) for x in missing]
702
            raise ie
703
        
704
        hash = map.hash()
705
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
706
        self.store.map_put(hash, map)
707
        return dest_version_id
708
    
709
    @backend_method
710
    def update_object_checksum(self, user, account, container, name, version, checksum):
711
        """Update an object's checksum."""
712
        
713
        logger.debug("update_object_checksum: %s %s %s %s %s", account, container, name, version, checksum)
714
        # Update objects with greater version and same hashmap and size (fix metadata updates).
715
        self._can_write(user, account, container, name)
716
        path, node = self._lookup_object(account, container, name)
717
        props = self._get_version(node, version)
718
        versions = self.node.node_get_versions(node)
719
        for x in versions:
720
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
721
                self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
722
    
723
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
724
        self._can_read(user, src_account, src_container, src_name)
725
        path, node = self._lookup_object(src_account, src_container, src_name)
726
        # TODO: Will do another fetch of the properties in duplicate version...
727
        props = self._get_version(node, src_version) # Check to see if source exists.
728
        src_version_id = props[self.SERIAL]
729
        hash = props[self.HASH]
730
        size = props[self.SIZE]
731
        
732
        is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
733
        dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy)
734
        return dest_version_id
735
    
736
    @backend_method
737
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
738
        """Copy an object's data and metadata."""
739
        
740
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version)
741
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False)
742
        return dest_version_id
743
    
744
    @backend_method
745
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None):
746
        """Move an object's data and metadata."""
747
        
748
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions)
749
        if user != src_account:
750
            raise NotAllowedError
751
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True)
752
        if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
753
            self._delete_object(user, src_account, src_container, src_name)
754
        return dest_version_id
755
    
756
    def _delete_object(self, user, account, container, name, until=None):
757
        if user != account:
758
            raise NotAllowedError
759
        
760
        if until is not None:
761
            path = '/'.join((account, container, name))
762
            node = self.node.node_lookup(path)
763
            if node is None:
764
                return
765
            hashes = []
766
            size = 0
767
            h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
768
            hashes += h
769
            size += s
770
            h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
771
            hashes += h
772
            size += s
773
            for h in hashes:
774
                self.store.map_delete(h)
775
            self.node.node_purge(node, until, CLUSTER_DELETED)
776
            try:
777
                props = self._get_version(node)
778
            except NameError:
779
                self.permissions.access_clear(path)
780
            self._report_size_change(user, account, -size, {'action': 'object purge'})
781
            return
782
        
783
        path, node = self._lookup_object(account, container, name)
784
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
785
        del_size = self._apply_versioning(account, container, src_version_id)
786
        if del_size:
787
            self._report_size_change(user, account, -del_size, {'action': 'object delete'})
788
        self._report_object_change(user, account, path, details={'action': 'object delete'})
789
        self.permissions.access_clear(path)
790
    
791
    @backend_method
792
    def delete_object(self, user, account, container, name, until=None):
793
        """Delete/purge an object."""
794
        
795
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
796
        self._delete_object(user, account, container, name, until)
797
    
798
    @backend_method
799
    def list_versions(self, user, account, container, name):
800
        """Return a list of all (version, version_timestamp) tuples for an object."""
801
        
802
        logger.debug("list_versions: %s %s %s", account, container, name)
803
        self._can_read(user, account, container, name)
804
        path, node = self._lookup_object(account, container, name)
805
        versions = self.node.node_get_versions(node)
806
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
807
    
808
    @backend_method
809
    def get_uuid(self, user, uuid):
810
        """Return the (account, container, name) for the UUID given."""
811
        
812
        logger.debug("get_uuid: %s", uuid)
813
        info = self.node.latest_uuid(uuid)
814
        if info is None:
815
            raise NameError
816
        path, serial = info
817
        account, container, name = path.split('/', 2)
818
        self._can_read(user, account, container, name)
819
        return (account, container, name)
820
    
821
    @backend_method
822
    def get_public(self, user, public):
823
        """Return the (account, container, name) for the public id given."""
824
        
825
        logger.debug("get_public: %s", public)
826
        if public is None or public < ULTIMATE_ANSWER:
827
            raise NameError
828
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
829
        if path is None:
830
            raise NameError
831
        account, container, name = path.split('/', 2)
832
        self._can_read(user, account, container, name)
833
        return (account, container, name)
834
    
835
    @backend_method(autocommit=0)
836
    def get_block(self, hash):
837
        """Return a block's data."""
838
        
839
        logger.debug("get_block: %s", hash)
840
        block = self.store.block_get(binascii.unhexlify(hash))
841
        if not block:
842
            raise NameError('Block does not exist')
843
        return block
844
    
845
    @backend_method(autocommit=0)
846
    def put_block(self, data):
847
        """Store a block and return the hash."""
848
        
849
        logger.debug("put_block: %s", len(data))
850
        return binascii.hexlify(self.store.block_put(data))
851
    
852
    @backend_method(autocommit=0)
853
    def update_block(self, hash, data, offset=0):
854
        """Update a known block and return the hash."""
855
        
856
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
857
        if offset == 0 and len(data) == self.block_size:
858
            return self.put_block(data)
859
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
860
        return binascii.hexlify(h)
861
    
862
    # Path functions.
863
    
864
    def _generate_uuid(self):
865
        return str(uuidlib.uuid4())
866
    
867
    def _put_object_node(self, path, parent, name):
868
        path = '/'.join((path, name))
869
        node = self.node.node_lookup(path)
870
        if node is None:
871
            node = self.node.node_create(parent, path)
872
        return path, node
873
    
874
    def _put_path(self, user, parent, path):
875
        node = self.node.node_create(parent, path)
876
        self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
877
        return node
878
    
879
    def _lookup_account(self, account, create=True):
880
        node = self.node.node_lookup(account)
881
        if node is None and create:
882
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
883
        return account, node
884
    
885
    def _lookup_container(self, account, container):
886
        path = '/'.join((account, container))
887
        node = self.node.node_lookup(path)
888
        if node is None:
889
            raise NameError('Container does not exist')
890
        return path, node
891
    
892
    def _lookup_object(self, account, container, name):
893
        path = '/'.join((account, container, name))
894
        node = self.node.node_lookup(path)
895
        if node is None:
896
            raise NameError('Object does not exist')
897
        return path, node
898
    
899
    def _get_properties(self, node, until=None):
900
        """Return properties until the timestamp given."""
901
        
902
        before = until if until is not None else inf
903
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
904
        if props is None and until is not None:
905
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
906
        if props is None:
907
            raise NameError('Path does not exist')
908
        return props
909
    
910
    def _get_statistics(self, node, until=None):
911
        """Return count, sum of size and latest timestamp of everything under node."""
912
        
913
        if until is None:
914
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
915
        else:
916
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
917
        if stats is None:
918
            stats = (0, 0, 0)
919
        return stats
920
    
921
    def _get_version(self, node, version=None):
922
        if version is None:
923
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
924
            if props is None:
925
                raise NameError('Object does not exist')
926
        else:
927
            try:
928
                version = int(version)
929
            except ValueError:
930
                raise IndexError('Version does not exist')
931
            props = self.node.version_get_properties(version)
932
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
933
                raise IndexError('Version does not exist')
934
        return props
935
    
936
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
937
        """Create a new version of the node."""
938
        
939
        props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
940
        if props is not None:
941
            src_version_id = props[self.SERIAL]
942
            src_hash = props[self.HASH]
943
            src_size = props[self.SIZE]
944
            src_type = props[self.TYPE]
945
            src_checksum = props[self.CHECKSUM]
946
        else:
947
            src_version_id = None
948
            src_hash = None
949
            src_size = 0
950
            src_type = ''
951
            src_checksum = ''
952
        if size is None: # Set metadata.
953
            hash = src_hash # This way hash can be set to None (account or container).
954
            size = src_size
955
        if type is None:
956
            type = src_type
957
        if checksum is None:
958
            checksum = src_checksum
959
        uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
960
        
961
        if src_node is None:
962
            pre_version_id = src_version_id
963
        else:
964
            pre_version_id = None
965
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
966
            if props is not None:
967
                pre_version_id = props[self.SERIAL]
968
        if pre_version_id is not None:
969
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
970
        
971
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
972
        return pre_version_id, dest_version_id
973
    
974
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
975
        if src_version_id is not None:
976
            self.node.attribute_copy(src_version_id, dest_version_id)
977
        if not replace:
978
            self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
979
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
980
        else:
981
            self.node.attribute_del(dest_version_id, domain)
982
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
983
    
984
    def _put_metadata(self, user, node, domain, meta, replace=False):
985
        """Create a new version and store metadata."""
986
        
987
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
988
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
989
        return src_version_id, dest_version_id
990
    
991
    def _list_limits(self, listing, marker, limit):
992
        start = 0
993
        if marker:
994
            try:
995
                start = listing.index(marker) + 1
996
            except ValueError:
997
                pass
998
        if not limit or limit > 10000:
999
            limit = 10000
1000
        return start, limit
1001
    
1002
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1003
        cont_prefix = path + '/'
1004
        prefix = cont_prefix + prefix
1005
        start = cont_prefix + marker if marker else None
1006
        before = until if until is not None else inf
1007
        filterq = keys if domain else []
1008
        sizeq = size_range
1009
        
1010
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1011
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1012
        objects.sort(key=lambda x: x[0])
1013
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1014
        
1015
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
1016
        return objects[start:start + limit]
1017
    
1018
    # Reporting functions.
1019
    
1020
    def _report_size_change(self, user, account, size, details={}):
1021
        logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1022
        account_node = self._lookup_account(account, True)[1]
1023
        total = self._get_statistics(account_node)[1]
1024
        details.update({'user': user, 'total': total})
1025
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1026
    
1027
    def _report_object_change(self, user, account, path, details={}):
1028
        logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1029
        details.update({'user': user})
1030
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1031
    
1032
    def _report_sharing_change(self, user, account, path, details={}):
1033
        logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
1034
        details.update({'user': user})
1035
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1036
    
1037
    # Policy functions.
1038
    
1039
    def _check_policy(self, policy):
1040
        for k in policy.keys():
1041
            if policy[k] == '':
1042
                policy[k] = self.default_policy.get(k)
1043
        for k, v in policy.iteritems():
1044
            if k == 'quota':
1045
                q = int(v) # May raise ValueError.
1046
                if q < 0:
1047
                    raise ValueError
1048
            elif k == 'versioning':
1049
                if v not in ['auto', 'none']:
1050
                    raise ValueError
1051
            else:
1052
                raise ValueError
1053
    
1054
    def _put_policy(self, node, policy, replace):
1055
        if replace:
1056
            for k, v in self.default_policy.iteritems():
1057
                if k not in policy:
1058
                    policy[k] = v
1059
        self.node.policy_set(node, policy)
1060
    
1061
    def _get_policy(self, node):
1062
        policy = self.default_policy.copy()
1063
        policy.update(self.node.policy_get(node))
1064
        return policy
1065
    
1066
    def _apply_versioning(self, account, container, version_id):
1067
        """Delete the provided version if such is the policy.
1068
           Return size of object removed.
1069
        """
1070
        
1071
        if version_id is None:
1072
            return 0
1073
        path, node = self._lookup_container(account, container)
1074
        versioning = self._get_policy(node)['versioning']
1075
        if versioning != 'auto':
1076
            hash, size = self.node.version_remove(version_id)
1077
            self.store.map_delete(hash)
1078
            return size
1079
        return 0
1080
    
1081
    # Access control functions.
1082
    
1083
    def _check_groups(self, groups):
1084
        # raise ValueError('Bad characters in groups')
1085
        pass
1086
    
1087
    def _check_permissions(self, path, permissions):
1088
        # raise ValueError('Bad characters in permissions')
1089
        pass
1090
    
1091
    def _get_formatted_paths(self, paths):
1092
        formatted = []
1093
        for p in paths:
1094
            node = self.node.node_lookup(p)
1095
            if node is not None:
1096
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1097
            if props is not None:
1098
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1099
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1100
                formatted.append((p, self.MATCH_EXACT))
1101
        return formatted
1102
    
1103
    def _get_permissions_path(self, account, container, name):
1104
        path = '/'.join((account, container, name))
1105
        permission_paths = self.permissions.access_inherit(path)
1106
        permission_paths.sort()
1107
        permission_paths.reverse()
1108
        for p in permission_paths:
1109
            if p == path:
1110
                return p
1111
            else:
1112
                if p.count('/') < 2:
1113
                    continue
1114
                node = self.node.node_lookup(p)
1115
                if node is not None:
1116
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1117
                if props is not None:
1118
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1119
                        return p
1120
        return None
1121
    
1122
    def _can_read(self, user, account, container, name):
1123
        if user == account:
1124
            return True
1125
        path = '/'.join((account, container, name))
1126
        if self.permissions.public_get(path) is not None:
1127
            return True
1128
        path = self._get_permissions_path(account, container, name)
1129
        if not path:
1130
            raise NotAllowedError
1131
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1132
            raise NotAllowedError
1133
    
1134
    def _can_write(self, user, account, container, name):
1135
        if user == account:
1136
            return True
1137
        path = '/'.join((account, container, name))
1138
        path = self._get_permissions_path(account, container, name)
1139
        if not path:
1140
            raise NotAllowedError
1141
        if not self.permissions.access_check(path, self.WRITE, user):
1142
            raise NotAllowedError
1143
    
1144
    def _allowed_accounts(self, user):
1145
        allow = set()
1146
        for path in self.permissions.access_list_paths(user):
1147
            allow.add(path.split('/', 1)[0])
1148
        return sorted(allow)
1149
    
1150
    def _allowed_containers(self, user, account):
1151
        allow = set()
1152
        for path in self.permissions.access_list_paths(user, account):
1153
            allow.add(path.split('/', 2)[1])
1154
        return sorted(allow)