Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ fe2db49d

History | View | Annotate | Download (51.3 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
43

    
44
# Stripped-down version of the HashMap class found in tools.
45
class HashMap(list):
46

    
47
    def __init__(self, blocksize, blockhash):
48
        super(HashMap, self).__init__()
49
        self.blocksize = blocksize
50
        self.blockhash = blockhash
51

    
52
    def _hash_raw(self, v):
53
        h = hashlib.new(self.blockhash)
54
        h.update(v)
55
        return h.digest()
56

    
57
    def hash(self):
58
        if len(self) == 0:
59
            return self._hash_raw('')
60
        if len(self) == 1:
61
            return self.__getitem__(0)
62

    
63
        h = list(self)
64
        s = 2
65
        while s < len(h):
66
            s = s * 2
67
        h += [('\x00' * len(h[0]))] * (s - len(h))
68
        while len(h) > 1:
69
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
70
        return h[0]
71

    
72
# Default modules and settings.
73
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
74
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
75
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
76
DEFAULT_BLOCK_PATH = 'data/'
77
DEFAULT_BLOCK_UMASK = 0o022
78
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
79
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
80

    
81
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
82
QUEUE_CLIENT_ID = 'pithos'
83
QUEUE_INSTANCE_ID = '1'
84

    
85
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
86

    
87
inf = float('inf')
88

    
89
ULTIMATE_ANSWER = 42
90

    
91

    
92
logger = logging.getLogger(__name__)
93

    
94

    
95
def backend_method(func=None, autocommit=1):
96
    if func is None:
97
        def fn(func):
98
            return backend_method(func, autocommit)
99
        return fn
100

    
101
    if not autocommit:
102
        return func
103
    def fn(self, *args, **kw):
104
        self.wrapper.execute()
105
        try:
106
            self.messages = []
107
            ret = func(self, *args, **kw)
108
            for m in self.messages:
109
                self.queue.send(*m)
110
            self.wrapper.commit()
111
            return ret
112
        except:
113
            self.wrapper.rollback()
114
            raise
115
    return fn
116

    
117

    
118
class ModularBackend(BaseBackend):
119
    """A modular backend.
120
    
121
    Uses modules for SQL functions and storage.
122
    """
123
    
124
    def __init__(self, db_module=None, db_connection=None,
125
                 block_module=None, block_path=None, block_umask=None,
126
                 queue_module=None, queue_connection=None):
127
        db_module = db_module or DEFAULT_DB_MODULE
128
        db_connection = db_connection or DEFAULT_DB_CONNECTION
129
        block_module = block_module or DEFAULT_BLOCK_MODULE
130
        block_path = block_path or DEFAULT_BLOCK_PATH
131
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
132
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
133
        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
134
        
135
        self.hash_algorithm = 'sha256'
136
        self.block_size = 4 * 1024 * 1024 # 4MB
137
        
138
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
139
        
140
        def load_module(m):
141
            __import__(m)
142
            return sys.modules[m]
143
        
144
        self.db_module = load_module(db_module)
145
        self.wrapper = self.db_module.DBWrapper(db_connection)
146
        params = {'wrapper': self.wrapper}
147
        self.permissions = self.db_module.Permissions(**params)
148
        for x in ['READ', 'WRITE']:
149
            setattr(self, x, getattr(self.db_module, x))
150
        self.node = self.db_module.Node(**params)
151
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
152
            setattr(self, x, getattr(self.db_module, x))
153
        
154
        self.block_module = load_module(block_module)
155
        params = {'path': block_path,
156
                  'block_size': self.block_size,
157
                  'hash_algorithm': self.hash_algorithm,
158
                  'umask': block_umask}
159
        self.store = self.block_module.Store(**params)
160

    
161
        if queue_module and queue_connection:
162
            self.queue_module = load_module(queue_module)
163
            params = {'exchange': queue_connection,
164
                      'client_id': QUEUE_CLIENT_ID}
165
            self.queue = self.queue_module.Queue(**params)
166
        else:
167
            class NoQueue:
168
                def send(self, *args):
169
                    pass
170
                
171
                def close(self):
172
                    pass
173
            
174
            self.queue = NoQueue()
175
    
176
    def close(self):
177
        self.wrapper.close()
178
        self.queue.close()
179
    
180
    @backend_method
181
    def list_accounts(self, user, marker=None, limit=10000):
182
        """Return a list of accounts the user can access."""
183
        
184
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
185
        allowed = self._allowed_accounts(user)
186
        start, limit = self._list_limits(allowed, marker, limit)
187
        return allowed[start:start + limit]
188
    
189
    @backend_method
190
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
191
        """Return a dictionary with the account metadata for the domain."""
192
        
193
        logger.debug("get_account_meta: %s %s %s %s", user, account, domain, until)
194
        path, node = self._lookup_account(account, user == account)
195
        if user != account:
196
            if until or node is None or account not in self._allowed_accounts(user):
197
                raise NotAllowedError
198
        try:
199
            props = self._get_properties(node, until)
200
            mtime = props[self.MTIME]
201
        except NameError:
202
            props = None
203
            mtime = until
204
        count, bytes, tstamp = self._get_statistics(node, until)
205
        tstamp = max(tstamp, mtime)
206
        if until is None:
207
            modified = tstamp
208
        else:
209
            modified = self._get_statistics(node)[2] # Overall last modification.
210
            modified = max(modified, mtime)
211
        
212
        if user != account:
213
            meta = {'name': account}
214
        else:
215
            meta = {}
216
            if props is not None and include_user_defined:
217
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
218
            if until is not None:
219
                meta.update({'until_timestamp': tstamp})
220
            meta.update({'name': account, 'count': count, 'bytes': bytes})
221
        meta.update({'modified': modified})
222
        return meta
223
    
224
    @backend_method
225
    def update_account_meta(self, user, account, domain, meta, replace=False):
226
        """Update the metadata associated with the account for the domain."""
227
        
228
        logger.debug("update_account_meta: %s %s %s %s %s", user, account, domain, meta, replace)
229
        if user != account:
230
            raise NotAllowedError
231
        path, node = self._lookup_account(account, True)
232
        self._put_metadata(user, node, domain, meta, replace)
233
    
234
    @backend_method
235
    def get_account_groups(self, user, account):
236
        """Return a dictionary with the user groups defined for this account."""
237
        
238
        logger.debug("get_account_groups: %s %s", user, account)
239
        if user != account:
240
            if account not in self._allowed_accounts(user):
241
                raise NotAllowedError
242
            return {}
243
        self._lookup_account(account, True)
244
        return self.permissions.group_dict(account)
245
    
246
    @backend_method
247
    def update_account_groups(self, user, account, groups, replace=False):
248
        """Update the groups associated with the account."""
249
        
250
        logger.debug("update_account_groups: %s %s %s %s", user, account, groups, replace)
251
        if user != account:
252
            raise NotAllowedError
253
        self._lookup_account(account, True)
254
        self._check_groups(groups)
255
        if replace:
256
            self.permissions.group_destroy(account)
257
        for k, v in groups.iteritems():
258
            if not replace: # If not already deleted.
259
                self.permissions.group_delete(account, k)
260
            if v:
261
                self.permissions.group_addmany(account, k, v)
262
    
263
    @backend_method
264
    def get_account_policy(self, user, account):
265
        """Return a dictionary with the account policy."""
266
        
267
        logger.debug("get_account_policy: %s %s", user, account)
268
        if user != account:
269
            if account not in self._allowed_accounts(user):
270
                raise NotAllowedError
271
            return {}
272
        path, node = self._lookup_account(account, True)
273
        return self._get_policy(node)
274
    
275
    @backend_method
276
    def update_account_policy(self, user, account, policy, replace=False):
277
        """Update the policy associated with the account."""
278
        
279
        logger.debug("update_account_policy: %s %s %s %s", user, account, policy, replace)
280
        if user != account:
281
            raise NotAllowedError
282
        path, node = self._lookup_account(account, True)
283
        self._check_policy(policy)
284
        self._put_policy(node, policy, replace)
285
    
286
    @backend_method
287
    def put_account(self, user, account, policy={}):
288
        """Create a new account with the given name."""
289
        
290
        logger.debug("put_account: %s %s %s", user, account, policy)
291
        if user != account:
292
            raise NotAllowedError
293
        node = self.node.node_lookup(account)
294
        if node is not None:
295
            raise NameError('Account already exists')
296
        if policy:
297
            self._check_policy(policy)
298
        node = self._put_path(user, self.ROOTNODE, account)
299
        self._put_policy(node, policy, True)
300
    
301
    @backend_method
302
    def delete_account(self, user, account):
303
        """Delete the account with the given name."""
304
        
305
        logger.debug("delete_account: %s %s", user, account)
306
        if user != account:
307
            raise NotAllowedError
308
        node = self.node.node_lookup(account)
309
        if node is None:
310
            return
311
        if not self.node.node_remove(node):
312
            raise IndexError('Account is not empty')
313
        self.permissions.group_destroy(account)
314
    
315
    @backend_method
316
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
317
        """Return a list of containers existing under an account."""
318
        
319
        logger.debug("list_containers: %s %s %s %s %s %s %s", user, account, marker, limit, shared, until, public)
320
        if user != account:
321
            if until or account not in self._allowed_accounts(user):
322
                raise NotAllowedError
323
            allowed = self._allowed_containers(user, account)
324
            start, limit = self._list_limits(allowed, marker, limit)
325
            return allowed[start:start + limit]
326
        if shared or public:
327
            allowed = []
328
            if shared:
329
                allowed.extend([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
330
            if public:
331
                allowed.extend([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
332
            allowed = list(set(allowed))
333
            allowed.sort()
334
            start, limit = self._list_limits(allowed, marker, limit)
335
            return allowed[start:start + limit]
336
        node = self.node.node_lookup(account)
337
        return [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
338
    
339
    @backend_method
340
    def list_container_meta(self, user, account, container, domain, until=None):
341
        """Return a list with all the container's object meta keys for the domain."""
342
        
343
        logger.debug("list_container_meta: %s %s %s %s %s", user, account, container, domain, until)
344
        allowed = []
345
        if user != account:
346
            if until:
347
                raise NotAllowedError
348
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
349
            if not allowed:
350
                raise NotAllowedError
351
        path, node = self._lookup_container(account, container)
352
        before = until if until is not None else inf
353
        allowed = self._get_formatted_paths(allowed)
354
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
355
    
356
    @backend_method
357
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
358
        """Return a dictionary with the container metadata for the domain."""
359
        
360
        logger.debug("get_container_meta: %s %s %s %s %s", user, account, container, domain, until)
361
        if user != account:
362
            if until or container not in self._allowed_containers(user, account):
363
                raise NotAllowedError
364
        path, node = self._lookup_container(account, container)
365
        props = self._get_properties(node, until)
366
        mtime = props[self.MTIME]
367
        count, bytes, tstamp = self._get_statistics(node, until)
368
        tstamp = max(tstamp, mtime)
369
        if until is None:
370
            modified = tstamp
371
        else:
372
            modified = self._get_statistics(node)[2] # Overall last modification.
373
            modified = max(modified, mtime)
374
        
375
        if user != account:
376
            meta = {'name': container}
377
        else:
378
            meta = {}
379
            if include_user_defined:
380
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
381
            if until is not None:
382
                meta.update({'until_timestamp': tstamp})
383
            meta.update({'name': container, 'count': count, 'bytes': bytes})
384
        meta.update({'modified': modified})
385
        return meta
386
    
387
    @backend_method
388
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
389
        """Update the metadata associated with the container for the domain."""
390
        
391
        logger.debug("update_container_meta: %s %s %s %s %s %s", user, account, container, domain, meta, replace)
392
        if user != account:
393
            raise NotAllowedError
394
        path, node = self._lookup_container(account, container)
395
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
396
        if src_version_id is not None:
397
            versioning = self._get_policy(node)['versioning']
398
            if versioning != 'auto':
399
                self.node.version_remove(src_version_id)
400
    
401
    @backend_method
402
    def get_container_policy(self, user, account, container):
403
        """Return a dictionary with the container policy."""
404
        
405
        logger.debug("get_container_policy: %s %s %s", user, account, container)
406
        if user != account:
407
            if container not in self._allowed_containers(user, account):
408
                raise NotAllowedError
409
            return {}
410
        path, node = self._lookup_container(account, container)
411
        return self._get_policy(node)
412
    
413
    @backend_method
414
    def update_container_policy(self, user, account, container, policy, replace=False):
415
        """Update the policy associated with the container."""
416
        
417
        logger.debug("update_container_policy: %s %s %s %s %s", user, account, container, policy, replace)
418
        if user != account:
419
            raise NotAllowedError
420
        path, node = self._lookup_container(account, container)
421
        self._check_policy(policy)
422
        self._put_policy(node, policy, replace)
423
    
424
    @backend_method
425
    def put_container(self, user, account, container, policy={}):
426
        """Create a new container with the given name."""
427
        
428
        logger.debug("put_container: %s %s %s %s", user, account, container, policy)
429
        if user != account:
430
            raise NotAllowedError
431
        try:
432
            path, node = self._lookup_container(account, container)
433
        except NameError:
434
            pass
435
        else:
436
            raise NameError('Container already exists')
437
        if policy:
438
            self._check_policy(policy)
439
        path = '/'.join((account, container))
440
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
441
        self._put_policy(node, policy, True)
442
    
443
    @backend_method
444
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
445
        """Delete/purge the container with the given name."""
446
        
447
        logger.debug("delete_container: %s %s %s %s %s %s", user, account, container, until, prefix, delimiter)
448
        if user != account:
449
            raise NotAllowedError
450
        path, node = self._lookup_container(account, container)
451
        
452
        if until is not None:
453
            hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
454
            for h in hashes:
455
                self.store.map_delete(h)
456
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
457
            self._report_size_change(user, account, -size, {'action': 'container purge'})
458
            return
459
        
460
        if self._get_statistics(node)[0] > 0:
461
            raise IndexError('Container is not empty')
462
        hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
463
        for h in hashes:
464
            self.store.map_delete(h)
465
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
466
        self.node.node_remove(node)
467
        self._report_size_change(user, account, -size, {'action': 'container delete'})
468
    
469
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
470
        if user != account and until:
471
            raise NotAllowedError
472
        allowed = self._list_object_permissions(user, account, container, prefix, shared, public)
473
        if (shared or public) and not allowed:
474
            return []
475
        path, node = self._lookup_container(account, container)
476
        allowed = self._get_formatted_paths(allowed)
477
        return self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
478
    
479
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
480
        allowed = []
481
        path = '/'.join((account, container, prefix)).rstrip('/')
482
        if user != account:
483
            allowed = self.permissions.access_list_paths(user, path)
484
            if not allowed:
485
                raise NotAllowedError
486
        else:
487
            allowed = []
488
            if shared:
489
                allowed.extend(self.permissions.access_list_shared(path))
490
            if public:
491
                allowed.extend([x[0] for x in self.permissions.public_list(path)])
492
            allowed = list(set(allowed))
493
            allowed.sort()
494
            if not allowed:
495
                return []
496
        return allowed
497
    
498
    @backend_method
499
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
500
        """Return a list of object (name, version_id) tuples existing under a container."""
501
        
502
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
503
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
504
    
505
    @backend_method
506
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
507
        """Return a list of object metadata dicts existing under a container."""
508
        
509
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
510
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
511
        objects = []
512
        for p in props:
513
            if len(p) == 2:
514
                objects.append({'subdir': p[0]})
515
            else:
516
                objects.append({'name': p[0],
517
                                'bytes': p[self.SIZE + 1],
518
                                'type': p[self.TYPE + 1],
519
                                'hash': p[self.HASH + 1],
520
                                'version': p[self.SERIAL + 1],
521
                                'version_timestamp': p[self.MTIME + 1],
522
                                'modified': p[self.MTIME + 1] if until is None else None,
523
                                'modified_by': p[self.MUSER + 1],
524
                                'uuid': p[self.UUID + 1],
525
                                'checksum': p[self.CHECKSUM + 1]})
526
        return objects
527
    
528
    @backend_method
529
    def list_object_permissions(self, user, account, container, prefix=''):
530
        """Return a list of paths that enforce permissions under a container."""
531
        
532
        logger.debug("list_object_permissions: %s %s %s %s", user, account, container, prefix)
533
        return self._list_object_permissions(user, account, container, prefix, True, False)
534
    
535
    @backend_method
536
    def list_object_public(self, user, account, container, prefix=''):
537
        """Return a dict mapping paths to public ids for objects that are public under a container."""
538
        
539
        logger.debug("list_object_public: %s %s %s %s", user, account, container, prefix)
540
        public = {}
541
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
542
            public[path] = p + ULTIMATE_ANSWER
543
        return public
544
    
545
    @backend_method
546
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
547
        """Return a dictionary with the object metadata for the domain."""
548
        
549
        logger.debug("get_object_meta: %s %s %s %s %s %s", user, account, container, name, domain, version)
550
        self._can_read(user, account, container, name)
551
        path, node = self._lookup_object(account, container, name)
552
        props = self._get_version(node, version)
553
        if version is None:
554
            modified = props[self.MTIME]
555
        else:
556
            try:
557
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
558
            except NameError: # Object may be deleted.
559
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
560
                if del_props is None:
561
                    raise NameError('Object does not exist')
562
                modified = del_props[self.MTIME]
563
        
564
        meta = {}
565
        if include_user_defined:
566
            meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
567
        meta.update({'name': name,
568
                     'bytes': props[self.SIZE],
569
                     'type': props[self.TYPE],
570
                     'hash': props[self.HASH],
571
                     'version': props[self.SERIAL],
572
                     'version_timestamp': props[self.MTIME],
573
                     'modified': modified,
574
                     'modified_by': props[self.MUSER],
575
                     'uuid': props[self.UUID],
576
                     'checksum': props[self.CHECKSUM]})
577
        return meta
578
    
579
    @backend_method
580
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
581
        """Update the metadata associated with the object for the domain and return the new version."""
582
        
583
        logger.debug("update_object_meta: %s %s %s %s %s %s %s", user, account, container, name, domain, meta, replace)
584
        self._can_write(user, account, container, name)
585
        path, node = self._lookup_object(account, container, name)
586
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
587
        self._apply_versioning(account, container, src_version_id)
588
        return dest_version_id
589
    
590
    @backend_method
591
    def get_object_permissions(self, user, account, container, name):
592
        """Return the action allowed on the object, the path
593
        from which the object gets its permissions from,
594
        along with a dictionary containing the permissions."""
595
        
596
        logger.debug("get_object_permissions: %s %s %s %s", user, account, container, name)
597
        allowed = 'write'
598
        permissions_path = self._get_permissions_path(account, container, name)
599
        if user != account:
600
            if self.permissions.access_check(permissions_path, self.WRITE, user):
601
                allowed = 'write'
602
            elif self.permissions.access_check(permissions_path, self.READ, user):
603
                allowed = 'read'
604
            else:
605
                raise NotAllowedError
606
        self._lookup_object(account, container, name)
607
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
608
    
609
    @backend_method
610
    def update_object_permissions(self, user, account, container, name, permissions):
611
        """Update the permissions associated with the object."""
612
        
613
        logger.debug("update_object_permissions: %s %s %s %s %s", user, account, container, name, permissions)
614
        if user != account:
615
            raise NotAllowedError
616
        path = self._lookup_object(account, container, name)[0]
617
        self._check_permissions(path, permissions)
618
        self.permissions.access_set(path, permissions)
619
        self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
620
    
621
    @backend_method
622
    def get_object_public(self, user, account, container, name):
623
        """Return the public id of the object if applicable."""
624
        
625
        logger.debug("get_object_public: %s %s %s %s", user, account, container, name)
626
        self._can_read(user, account, container, name)
627
        path = self._lookup_object(account, container, name)[0]
628
        p = self.permissions.public_get(path)
629
        if p is not None:
630
            p += ULTIMATE_ANSWER
631
        return p
632
    
633
    @backend_method
634
    def update_object_public(self, user, account, container, name, public):
635
        """Update the public status of the object."""
636
        
637
        logger.debug("update_object_public: %s %s %s %s %s", user, account, container, name, public)
638
        self._can_write(user, account, container, name)
639
        path = self._lookup_object(account, container, name)[0]
640
        if not public:
641
            self.permissions.public_unset(path)
642
        else:
643
            self.permissions.public_set(path)
644
    
645
    @backend_method
646
    def get_object_hashmap(self, user, account, container, name, version=None):
647
        """Return the object's size and a list with partial hashes."""
648
        
649
        logger.debug("get_object_hashmap: %s %s %s %s %s", user, account, container, name, version)
650
        self._can_read(user, account, container, name)
651
        path, node = self._lookup_object(account, container, name)
652
        props = self._get_version(node, version)
653
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
654
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
655
    
656
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
657
        if permissions is not None and user != account:
658
            raise NotAllowedError
659
        self._can_write(user, account, container, name)
660
        if permissions is not None:
661
            path = '/'.join((account, container, name))
662
            self._check_permissions(path, permissions)
663
        
664
        account_path, account_node = self._lookup_account(account, True)
665
        container_path, container_node = self._lookup_container(account, container)
666
        path, node = self._put_object_node(container_path, container_node, name)
667
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
668
        
669
        # Handle meta.
670
        if src_version_id is None:
671
            src_version_id = pre_version_id
672
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
673
        
674
        # Check quota.
675
        del_size = self._apply_versioning(account, container, pre_version_id)
676
        size_delta = size - del_size
677
        if size_delta > 0:
678
            account_quota = long(self._get_policy(account_node)['quota'])
679
            container_quota = long(self._get_policy(container_node)['quota'])
680
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
681
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
682
                # This must be executed in a transaction, so the version is never created if it fails.
683
                raise QuotaError
684
        self._report_size_change(user, account, size_delta, {'action': 'object update'})
685
        
686
        if permissions is not None:
687
            self.permissions.access_set(path, permissions)
688
            self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
689
        
690
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
691
        return dest_version_id
692
    
693
    @backend_method
694
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
695
        """Create/update an object with the specified size and partial hashes."""
696
        
697
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user, account, container, name, size, type, hashmap, checksum)
698
        if size == 0: # No such thing as an empty hashmap.
699
            hashmap = [self.put_block('')]
700
        map = HashMap(self.block_size, self.hash_algorithm)
701
        map.extend([binascii.unhexlify(x) for x in hashmap])
702
        missing = self.store.block_search(map)
703
        if missing:
704
            ie = IndexError()
705
            ie.data = [binascii.hexlify(x) for x in missing]
706
            raise ie
707
        
708
        hash = map.hash()
709
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
710
        self.store.map_put(hash, map)
711
        return dest_version_id
712
    
713
    @backend_method
714
    def update_object_checksum(self, user, account, container, name, version, checksum):
715
        """Update an object's checksum."""
716
        
717
        logger.debug("update_object_checksum: %s %s %s %s %s %s", user, account, container, name, version, checksum)
718
        # Update objects with greater version and same hashmap and size (fix metadata updates).
719
        self._can_write(user, account, container, name)
720
        path, node = self._lookup_object(account, container, name)
721
        props = self._get_version(node, version)
722
        versions = self.node.node_get_versions(node)
723
        for x in versions:
724
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
725
                self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
726
    
727
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
728
        self._can_read(user, src_account, src_container, src_name)
729
        path, node = self._lookup_object(src_account, src_container, src_name)
730
        # TODO: Will do another fetch of the properties in duplicate version...
731
        props = self._get_version(node, src_version) # Check to see if source exists.
732
        src_version_id = props[self.SERIAL]
733
        hash = props[self.HASH]
734
        size = props[self.SIZE]
735
        
736
        is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
737
        dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy)
738
        return dest_version_id
739
    
740
    @backend_method
741
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
742
        """Copy an object's data and metadata."""
743
        
744
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version)
745
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False)
746
        return dest_version_id
747
    
748
    @backend_method
749
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None):
750
        """Move an object's data and metadata."""
751
        
752
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions)
753
        if user != src_account:
754
            raise NotAllowedError
755
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True)
756
        if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
757
            self._delete_object(user, src_account, src_container, src_name)
758
        return dest_version_id
759
    
760
    def _delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
761
        if user != account:
762
            raise NotAllowedError
763
        
764
        if until is not None:
765
            path = '/'.join((account, container, name))
766
            node = self.node.node_lookup(path)
767
            if node is None:
768
                return
769
            hashes = []
770
            size = 0
771
            h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
772
            hashes += h
773
            size += s
774
            h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
775
            hashes += h
776
            size += s
777
            for h in hashes:
778
                self.store.map_delete(h)
779
            self.node.node_purge(node, until, CLUSTER_DELETED)
780
            try:
781
                props = self._get_version(node)
782
            except NameError:
783
                self.permissions.access_clear(path)
784
            self._report_size_change(user, account, -size, {'action': 'object purge'})
785
            return
786
        
787
        path, node = self._lookup_object(account, container, name)
788
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
789
        del_size = self._apply_versioning(account, container, src_version_id)
790
        if del_size:
791
            self._report_size_change(user, account, -del_size, {'action': 'object delete'})
792
        self._report_object_change(user, account, path, details={'action': 'object delete'})
793
        self.permissions.access_clear(path)
794
    
795
    @backend_method
796
    def (self, user, account, container, name, until=None, prefix='', delimiter=None):
797
        """Delete/purge an object."""
798
        
799
        logger.debug("delete_object: %s %s %s %s %s %s %s", user, account, container, name, until, prefix, delimiter)
800
        self._delete_object(user, account, container, name, until)
801
    
802
    @backend_method
803
    def list_versions(self, user, account, container, name):
804
        """Return a list of all (version, version_timestamp) tuples for an object."""
805
        
806
        logger.debug("list_versions: %s %s %s %s", user, account, container, name)
807
        self._can_read(user, account, container, name)
808
        path, node = self._lookup_object(account, container, name)
809
        versions = self.node.node_get_versions(node)
810
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
811
    
812
    @backend_method
813
    def get_uuid(self, user, uuid):
814
        """Return the (account, container, name) for the UUID given."""
815
        
816
        logger.debug("get_uuid: %s %s", user, uuid)
817
        info = self.node.latest_uuid(uuid)
818
        if info is None:
819
            raise NameError
820
        path, serial = info
821
        account, container, name = path.split('/', 2)
822
        self._can_read(user, account, container, name)
823
        return (account, container, name)
824
    
825
    @backend_method
826
    def get_public(self, user, public):
827
        """Return the (account, container, name) for the public id given."""
828
        
829
        logger.debug("get_public: %s %s", user, public)
830
        if public is None or public < ULTIMATE_ANSWER:
831
            raise NameError
832
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
833
        if path is None:
834
            raise NameError
835
        account, container, name = path.split('/', 2)
836
        self._can_read(user, account, container, name)
837
        return (account, container, name)
838
    
839
    @backend_method(autocommit=0)
840
    def get_block(self, hash):
841
        """Return a block's data."""
842
        
843
        logger.debug("get_block: %s", hash)
844
        block = self.store.block_get(binascii.unhexlify(hash))
845
        if not block:
846
            raise NameError('Block does not exist')
847
        return block
848
    
849
    @backend_method(autocommit=0)
850
    def put_block(self, data):
851
        """Store a block and return the hash."""
852
        
853
        logger.debug("put_block: %s", len(data))
854
        return binascii.hexlify(self.store.block_put(data))
855
    
856
    @backend_method(autocommit=0)
857
    def update_block(self, hash, data, offset=0):
858
        """Update a known block and return the hash."""
859
        
860
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
861
        if offset == 0 and len(data) == self.block_size:
862
            return self.put_block(data)
863
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
864
        return binascii.hexlify(h)
865
    
866
    # Path functions.
867
    
868
    def _generate_uuid(self):
869
        return str(uuidlib.uuid4())
870
    
871
    def _put_object_node(self, path, parent, name):
872
        path = '/'.join((path, name))
873
        node = self.node.node_lookup(path)
874
        if node is None:
875
            node = self.node.node_create(parent, path)
876
        return path, node
877
    
878
    def _put_path(self, user, parent, path):
879
        node = self.node.node_create(parent, path)
880
        self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
881
        return node
882
    
883
    def _lookup_account(self, account, create=True):
884
        node = self.node.node_lookup(account)
885
        if node is None and create:
886
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
887
        return account, node
888
    
889
    def _lookup_container(self, account, container):
890
        path = '/'.join((account, container))
891
        node = self.node.node_lookup(path)
892
        if node is None:
893
            raise NameError('Container does not exist')
894
        return path, node
895
    
896
    def _lookup_object(self, account, container, name):
897
        path = '/'.join((account, container, name))
898
        node = self.node.node_lookup(path)
899
        if node is None:
900
            raise NameError('Object does not exist')
901
        return path, node
902
    
903
    def _get_properties(self, node, until=None):
904
        """Return properties until the timestamp given."""
905
        
906
        before = until if until is not None else inf
907
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
908
        if props is None and until is not None:
909
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
910
        if props is None:
911
            raise NameError('Path does not exist')
912
        return props
913
    
914
    def _get_statistics(self, node, until=None):
915
        """Return count, sum of size and latest timestamp of everything under node."""
916
        
917
        if until is None:
918
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
919
        else:
920
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
921
        if stats is None:
922
            stats = (0, 0, 0)
923
        return stats
924
    
925
    def _get_version(self, node, version=None):
926
        if version is None:
927
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
928
            if props is None:
929
                raise NameError('Object does not exist')
930
        else:
931
            try:
932
                version = int(version)
933
            except ValueError:
934
                raise IndexError('Version does not exist')
935
            props = self.node.version_get_properties(version)
936
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
937
                raise IndexError('Version does not exist')
938
        return props
939
    
940
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
941
        """Create a new version of the node."""
942
        
943
        props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
944
        if props is not None:
945
            src_version_id = props[self.SERIAL]
946
            src_hash = props[self.HASH]
947
            src_size = props[self.SIZE]
948
            src_type = props[self.TYPE]
949
            src_checksum = props[self.CHECKSUM]
950
        else:
951
            src_version_id = None
952
            src_hash = None
953
            src_size = 0
954
            src_type = ''
955
            src_checksum = ''
956
        if size is None: # Set metadata.
957
            hash = src_hash # This way hash can be set to None (account or container).
958
            size = src_size
959
        if type is None:
960
            type = src_type
961
        if checksum is None:
962
            checksum = src_checksum
963
        uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
964
        
965
        if src_node is None:
966
            pre_version_id = src_version_id
967
        else:
968
            pre_version_id = None
969
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
970
            if props is not None:
971
                pre_version_id = props[self.SERIAL]
972
        if pre_version_id is not None:
973
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
974
        
975
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
976
        return pre_version_id, dest_version_id
977
    
978
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
979
        if src_version_id is not None:
980
            self.node.attribute_copy(src_version_id, dest_version_id)
981
        if not replace:
982
            self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
983
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
984
        else:
985
            self.node.attribute_del(dest_version_id, domain)
986
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
987
    
988
    def _put_metadata(self, user, node, domain, meta, replace=False):
989
        """Create a new version and store metadata."""
990
        
991
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
992
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
993
        return src_version_id, dest_version_id
994
    
995
    def _list_limits(self, listing, marker, limit):
996
        start = 0
997
        if marker:
998
            try:
999
                start = listing.index(marker) + 1
1000
            except ValueError:
1001
                pass
1002
        if not limit or limit > 10000:
1003
            limit = 10000
1004
        return start, limit
1005
    
1006
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1007
        cont_prefix = path + '/'
1008
        prefix = cont_prefix + prefix
1009
        start = cont_prefix + marker if marker else None
1010
        before = until if until is not None else inf
1011
        filterq = keys if domain else []
1012
        sizeq = size_range
1013
        
1014
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1015
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1016
        objects.sort(key=lambda x: x[0])
1017
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1018
        
1019
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
1020
        return objects[start:start + limit]
1021
    
1022
    # Reporting functions.
1023
    
1024
    def _report_size_change(self, user, account, size, details={}):
1025
        logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1026
        account_node = self._lookup_account(account, True)[1]
1027
        total = self._get_statistics(account_node)[1]
1028
        details.update({'user': user, 'total': total})
1029
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1030
    
1031
    def _report_object_change(self, user, account, path, details={}):
1032
        logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1033
        details.update({'user': user})
1034
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1035
    
1036
    def _report_sharing_change(self, user, account, path, details={}):
1037
        logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
1038
        details.update({'user': user})
1039
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1040
    
1041
    # Policy functions.
1042
    
1043
    def _check_policy(self, policy):
1044
        for k in policy.keys():
1045
            if policy[k] == '':
1046
                policy[k] = self.default_policy.get(k)
1047
        for k, v in policy.iteritems():
1048
            if k == 'quota':
1049
                q = int(v) # May raise ValueError.
1050
                if q < 0:
1051
                    raise ValueError
1052
            elif k == 'versioning':
1053
                if v not in ['auto', 'none']:
1054
                    raise ValueError
1055
            else:
1056
                raise ValueError
1057
    
1058
    def _put_policy(self, node, policy, replace):
1059
        if replace:
1060
            for k, v in self.default_policy.iteritems():
1061
                if k not in policy:
1062
                    policy[k] = v
1063
        self.node.policy_set(node, policy)
1064
    
1065
    def _get_policy(self, node):
1066
        policy = self.default_policy.copy()
1067
        policy.update(self.node.policy_get(node))
1068
        return policy
1069
    
1070
    def _apply_versioning(self, account, container, version_id):
1071
        """Delete the provided version if such is the policy.
1072
           Return size of object removed.
1073
        """
1074
        
1075
        if version_id is None:
1076
            return 0
1077
        path, node = self._lookup_container(account, container)
1078
        versioning = self._get_policy(node)['versioning']
1079
        if versioning != 'auto':
1080
            hash, size = self.node.version_remove(version_id)
1081
            self.store.map_delete(hash)
1082
            return size
1083
        return 0
1084
    
1085
    # Access control functions.
1086
    
1087
    def _check_groups(self, groups):
1088
        # raise ValueError('Bad characters in groups')
1089
        pass
1090
    
1091
    def _check_permissions(self, path, permissions):
1092
        # raise ValueError('Bad characters in permissions')
1093
        pass
1094
    
1095
    def _get_formatted_paths(self, paths):
1096
        formatted = []
1097
        for p in paths:
1098
            node = self.node.node_lookup(p)
1099
            if node is not None:
1100
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1101
            if props is not None:
1102
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1103
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1104
                formatted.append((p, self.MATCH_EXACT))
1105
        return formatted
1106
    
1107
    def _get_permissions_path(self, account, container, name):
1108
        path = '/'.join((account, container, name))
1109
        permission_paths = self.permissions.access_inherit(path)
1110
        permission_paths.sort()
1111
        permission_paths.reverse()
1112
        for p in permission_paths:
1113
            if p == path:
1114
                return p
1115
            else:
1116
                if p.count('/') < 2:
1117
                    continue
1118
                node = self.node.node_lookup(p)
1119
                if node is not None:
1120
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1121
                if props is not None:
1122
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1123
                        return p
1124
        return None
1125
    
1126
    def _can_read(self, user, account, container, name):
1127
        if user == account:
1128
            return True
1129
        path = '/'.join((account, container, name))
1130
        if self.permissions.public_get(path) is not None:
1131
            return True
1132
        path = self._get_permissions_path(account, container, name)
1133
        if not path:
1134
            raise NotAllowedError
1135
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1136
            raise NotAllowedError
1137
    
1138
    def _can_write(self, user, account, container, name):
1139
        if user == account:
1140
            return True
1141
        path = '/'.join((account, container, name))
1142
        path = self._get_permissions_path(account, container, name)
1143
        if not path:
1144
            raise NotAllowedError
1145
        if not self.permissions.access_check(path, self.WRITE, user):
1146
            raise NotAllowedError
1147
    
1148
    def _allowed_accounts(self, user):
1149
        allow = set()
1150
        for path in self.permissions.access_list_paths(user):
1151
            allow.add(path.split('/', 1)[0])
1152
        return sorted(allow)
1153
    
1154
    def _allowed_containers(self, user, account):
1155
        allow = set()
1156
        for path in self.permissions.access_list_paths(user, account):
1157
            allow.add(path.split('/', 2)[1])
1158
        return sorted(allow)