Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 56ac7c81

History | View | Annotate | Download (57 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
43

    
44
# Stripped-down version of the HashMap class found in tools.
45
class HashMap(list):
46

    
47
    def __init__(self, blocksize, blockhash):
48
        super(HashMap, self).__init__()
49
        self.blocksize = blocksize
50
        self.blockhash = blockhash
51

    
52
    def _hash_raw(self, v):
53
        h = hashlib.new(self.blockhash)
54
        h.update(v)
55
        return h.digest()
56

    
57
    def hash(self):
58
        if len(self) == 0:
59
            return self._hash_raw('')
60
        if len(self) == 1:
61
            return self.__getitem__(0)
62

    
63
        h = list(self)
64
        s = 2
65
        while s < len(h):
66
            s = s * 2
67
        h += [('\x00' * len(h[0]))] * (s - len(h))
68
        while len(h) > 1:
69
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
70
        return h[0]
71

    
72
# Default modules and settings.
73
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
74
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
75
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
76
DEFAULT_BLOCK_PATH = 'data/'
77
DEFAULT_BLOCK_UMASK = 0o022
78
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
79
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
80

    
81
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
82
QUEUE_CLIENT_ID = 'pithos'
83
QUEUE_INSTANCE_ID = '1'
84

    
85
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
86

    
87
inf = float('inf')
88

    
89
ULTIMATE_ANSWER = 42
90

    
91

    
92
logger = logging.getLogger(__name__)
93

    
94

    
95
def backend_method(func=None, autocommit=1):
96
    if func is None:
97
        def fn(func):
98
            return backend_method(func, autocommit)
99
        return fn
100

    
101
    if not autocommit:
102
        return func
103
    def fn(self, *args, **kw):
104
        self.wrapper.execute()
105
        try:
106
            self.messages = []
107
            ret = func(self, *args, **kw)
108
            for m in self.messages:
109
                self.queue.send(*m)
110
            self.wrapper.commit()
111
            return ret
112
        except:
113
            self.wrapper.rollback()
114
            raise
115
    return fn
116

    
117

    
118
class ModularBackend(BaseBackend):
119
    """A modular backend.
120
    
121
    Uses modules for SQL functions and storage.
122
    """
123
    
124
    def __init__(self, db_module=None, db_connection=None,
125
                 block_module=None, block_path=None, block_umask=None,
126
                 queue_module=None, queue_connection=None):
127
        db_module = db_module or DEFAULT_DB_MODULE
128
        db_connection = db_connection or DEFAULT_DB_CONNECTION
129
        block_module = block_module or DEFAULT_BLOCK_MODULE
130
        block_path = block_path or DEFAULT_BLOCK_PATH
131
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
132
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
133
        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
134
        
135
        self.hash_algorithm = 'sha256'
136
        self.block_size = 4 * 1024 * 1024 # 4MB
137
        
138
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
139
        
140
        def load_module(m):
141
            __import__(m)
142
            return sys.modules[m]
143
        
144
        self.db_module = load_module(db_module)
145
        self.wrapper = self.db_module.DBWrapper(db_connection)
146
        params = {'wrapper': self.wrapper}
147
        self.permissions = self.db_module.Permissions(**params)
148
        for x in ['READ', 'WRITE']:
149
            setattr(self, x, getattr(self.db_module, x))
150
        self.node = self.db_module.Node(**params)
151
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
152
            setattr(self, x, getattr(self.db_module, x))
153
        
154
        self.block_module = load_module(block_module)
155
        params = {'path': block_path,
156
                  'block_size': self.block_size,
157
                  'hash_algorithm': self.hash_algorithm,
158
                  'umask': block_umask}
159
        self.store = self.block_module.Store(**params)
160

    
161
        if queue_module and queue_connection:
162
            self.queue_module = load_module(queue_module)
163
            params = {'exchange': queue_connection,
164
                      'client_id': QUEUE_CLIENT_ID}
165
            self.queue = self.queue_module.Queue(**params)
166
        else:
167
            class NoQueue:
168
                def send(self, *args):
169
                    pass
170
                
171
                def close(self):
172
                    pass
173
            
174
            self.queue = NoQueue()
175
    
176
    def close(self):
177
        self.wrapper.close()
178
        self.queue.close()
179
    
180
    @backend_method
181
    def list_accounts(self, user, marker=None, limit=10000):
182
        """Return a list of accounts the user can access."""
183
        
184
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
185
        allowed = self._allowed_accounts(user)
186
        start, limit = self._list_limits(allowed, marker, limit)
187
        return allowed[start:start + limit]
188
    
189
    @backend_method
190
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
191
        """Return a dictionary with the account metadata for the domain."""
192
        
193
        logger.debug("get_account_meta: %s %s %s %s", user, account, domain, until)
194
        path, node = self._lookup_account(account, user == account)
195
        if user != account:
196
            if until or node is None or account not in self._allowed_accounts(user):
197
                raise NotAllowedError
198
        try:
199
            props = self._get_properties(node, until)
200
            mtime = props[self.MTIME]
201
        except NameError:
202
            props = None
203
            mtime = until
204
        count, bytes, tstamp = self._get_statistics(node, until)
205
        tstamp = max(tstamp, mtime)
206
        if until is None:
207
            modified = tstamp
208
        else:
209
            modified = self._get_statistics(node)[2] # Overall last modification.
210
            modified = max(modified, mtime)
211
        
212
        if user != account:
213
            meta = {'name': account}
214
        else:
215
            meta = {}
216
            if props is not None and include_user_defined:
217
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
218
            if until is not None:
219
                meta.update({'until_timestamp': tstamp})
220
            meta.update({'name': account, 'count': count, 'bytes': bytes})
221
        meta.update({'modified': modified})
222
        return meta
223
    
224
    @backend_method
225
    def update_account_meta(self, user, account, domain, meta, replace=False):
226
        """Update the metadata associated with the account for the domain."""
227
        
228
        logger.debug("update_account_meta: %s %s %s %s %s", user, account, domain, meta, replace)
229
        if user != account:
230
            raise NotAllowedError
231
        path, node = self._lookup_account(account, True)
232
        self._put_metadata(user, node, domain, meta, replace)
233
    
234
    @backend_method
235
    def get_account_groups(self, user, account):
236
        """Return a dictionary with the user groups defined for this account."""
237
        
238
        logger.debug("get_account_groups: %s %s", user, account)
239
        if user != account:
240
            if account not in self._allowed_accounts(user):
241
                raise NotAllowedError
242
            return {}
243
        self._lookup_account(account, True)
244
        return self.permissions.group_dict(account)
245
    
246
    @backend_method
247
    def update_account_groups(self, user, account, groups, replace=False):
248
        """Update the groups associated with the account."""
249
        
250
        logger.debug("update_account_groups: %s %s %s %s", user, account, groups, replace)
251
        if user != account:
252
            raise NotAllowedError
253
        self._lookup_account(account, True)
254
        self._check_groups(groups)
255
        if replace:
256
            self.permissions.group_destroy(account)
257
        for k, v in groups.iteritems():
258
            if not replace: # If not already deleted.
259
                self.permissions.group_delete(account, k)
260
            if v:
261
                self.permissions.group_addmany(account, k, v)
262
    
263
    @backend_method
264
    def get_account_policy(self, user, account):
265
        """Return a dictionary with the account policy."""
266
        
267
        logger.debug("get_account_policy: %s %s", user, account)
268
        if user != account:
269
            if account not in self._allowed_accounts(user):
270
                raise NotAllowedError
271
            return {}
272
        path, node = self._lookup_account(account, True)
273
        return self._get_policy(node)
274
    
275
    @backend_method
276
    def update_account_policy(self, user, account, policy, replace=False):
277
        """Update the policy associated with the account."""
278
        
279
        logger.debug("update_account_policy: %s %s %s %s", user, account, policy, replace)
280
        if user != account:
281
            raise NotAllowedError
282
        path, node = self._lookup_account(account, True)
283
        self._check_policy(policy)
284
        self._put_policy(node, policy, replace)
285
    
286
    @backend_method
287
    def put_account(self, user, account, policy={}):
288
        """Create a new account with the given name."""
289
        
290
        logger.debug("put_account: %s %s %s", user, account, policy)
291
        if user != account:
292
            raise NotAllowedError
293
        node = self.node.node_lookup(account)
294
        if node is not None:
295
            raise NameError('Account already exists')
296
        if policy:
297
            self._check_policy(policy)
298
        node = self._put_path(user, self.ROOTNODE, account)
299
        self._put_policy(node, policy, True)
300
    
301
    @backend_method
302
    def delete_account(self, user, account):
303
        """Delete the account with the given name."""
304
        
305
        logger.debug("delete_account: %s %s", user, account)
306
        if user != account:
307
            raise NotAllowedError
308
        node = self.node.node_lookup(account)
309
        if node is None:
310
            return
311
        if not self.node.node_remove(node):
312
            raise IndexError('Account is not empty')
313
        self.permissions.group_destroy(account)
314
    
315
    @backend_method
316
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
317
        """Return a list of containers existing under an account."""
318
        
319
        logger.debug("list_containers: %s %s %s %s %s %s %s", user, account, marker, limit, shared, until, public)
320
        if user != account:
321
            if until or account not in self._allowed_accounts(user):
322
                raise NotAllowedError
323
            allowed = self._allowed_containers(user, account)
324
            start, limit = self._list_limits(allowed, marker, limit)
325
            return allowed[start:start + limit]
326
        if shared or public:
327
            allowed = set()
328
            if shared:
329
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
330
            if public:
331
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
332
            allowed = sorted(allowed)
333
            start, limit = self._list_limits(allowed, marker, limit)
334
            return allowed[start:start + limit]
335
        node = self.node.node_lookup(account)
336
        containers = [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
337
        start, limit = self._list_limits([x[0] for x in containers], marker, limit)
338
        return containers[start:start + limit]
339
    
340
    @backend_method
341
    def list_container_meta(self, user, account, container, domain, until=None):
342
        """Return a list with all the container's object meta keys for the domain."""
343
        
344
        logger.debug("list_container_meta: %s %s %s %s %s", user, account, container, domain, until)
345
        allowed = []
346
        if user != account:
347
            if until:
348
                raise NotAllowedError
349
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
350
            if not allowed:
351
                raise NotAllowedError
352
        path, node = self._lookup_container(account, container)
353
        before = until if until is not None else inf
354
        allowed = self._get_formatted_paths(allowed)
355
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
356
    
357
    @backend_method
358
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
359
        """Return a dictionary with the container metadata for the domain."""
360
        
361
        logger.debug("get_container_meta: %s %s %s %s %s", user, account, container, domain, until)
362
        if user != account:
363
            if until or container not in self._allowed_containers(user, account):
364
                raise NotAllowedError
365
        path, node = self._lookup_container(account, container)
366
        props = self._get_properties(node, until)
367
        mtime = props[self.MTIME]
368
        count, bytes, tstamp = self._get_statistics(node, until)
369
        tstamp = max(tstamp, mtime)
370
        if until is None:
371
            modified = tstamp
372
        else:
373
            modified = self._get_statistics(node)[2] # Overall last modification.
374
            modified = max(modified, mtime)
375
        
376
        if user != account:
377
            meta = {'name': container}
378
        else:
379
            meta = {}
380
            if include_user_defined:
381
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
382
            if until is not None:
383
                meta.update({'until_timestamp': tstamp})
384
            meta.update({'name': container, 'count': count, 'bytes': bytes})
385
        meta.update({'modified': modified})
386
        return meta
387
    
388
    @backend_method
389
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
390
        """Update the metadata associated with the container for the domain."""
391
        
392
        logger.debug("update_container_meta: %s %s %s %s %s %s", user, account, container, domain, meta, replace)
393
        if user != account:
394
            raise NotAllowedError
395
        path, node = self._lookup_container(account, container)
396
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
397
        if src_version_id is not None:
398
            versioning = self._get_policy(node)['versioning']
399
            if versioning != 'auto':
400
                self.node.version_remove(src_version_id)
401
    
402
    @backend_method
403
    def get_container_policy(self, user, account, container):
404
        """Return a dictionary with the container policy."""
405
        
406
        logger.debug("get_container_policy: %s %s %s", user, account, container)
407
        if user != account:
408
            if container not in self._allowed_containers(user, account):
409
                raise NotAllowedError
410
            return {}
411
        path, node = self._lookup_container(account, container)
412
        return self._get_policy(node)
413
    
414
    @backend_method
415
    def update_container_policy(self, user, account, container, policy, replace=False):
416
        """Update the policy associated with the container."""
417
        
418
        logger.debug("update_container_policy: %s %s %s %s %s", user, account, container, policy, replace)
419
        if user != account:
420
            raise NotAllowedError
421
        path, node = self._lookup_container(account, container)
422
        self._check_policy(policy)
423
        self._put_policy(node, policy, replace)
424
    
425
    @backend_method
426
    def put_container(self, user, account, container, policy={}):
427
        """Create a new container with the given name."""
428
        
429
        logger.debug("put_container: %s %s %s %s", user, account, container, policy)
430
        if user != account:
431
            raise NotAllowedError
432
        try:
433
            path, node = self._lookup_container(account, container)
434
        except NameError:
435
            pass
436
        else:
437
            raise NameError('Container already exists')
438
        if policy:
439
            self._check_policy(policy)
440
        path = '/'.join((account, container))
441
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
442
        self._put_policy(node, policy, True)
443
    
444
    @backend_method
445
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
446
        """Delete/purge the container with the given name."""
447
        
448
        logger.debug("delete_container: %s %s %s %s %s %s", user, account, container, until, prefix, delimiter)
449
        if user != account:
450
            raise NotAllowedError
451
        path, node = self._lookup_container(account, container)
452
        
453
        if until is not None:
454
            hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
455
            for h in hashes:
456
                self.store.map_delete(h)
457
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
458
            self._report_size_change(user, account, -size, {'action': 'container purge'})
459
            return
460
        
461
        if self._get_statistics(node)[0] > 0:
462
            raise IndexError('Container is not empty')
463
        hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
464
        for h in hashes:
465
            self.store.map_delete(h)
466
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
467
        self.node.node_remove(node)
468
        self._report_size_change(user, account, -size, {'action': 'container delete'})
469
    
470
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
471
        if user != account and until:
472
            raise NotAllowedError
473
        if shared and public:
474
            # get shared first
475
            shared = self._list_object_permissions(user, account, container, prefix, shared=True, public=False)
476
            objects = []
477
            if shared:
478
                path, node = self._lookup_container(account, container)
479
                shared = self._get_formatted_paths(shared)
480
                objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props)
481
            
482
            # get public
483
            objects.extend(self._list_public_object_properties(user, account, container, prefix, all_props))
484
            
485
            objects.sort(key=lambda x: x[0])
486
            start, limit = self._list_limits([x[0] for x in objects], marker, limit)
487
            return objects[start:start + limit]
488
        elif public:
489
            objects = self._list_public_object_properties(user, account, container, prefix, all_props)
490
            start, limit = self._list_limits([x[0] for x in objects], marker, limit)
491
            return objects[start:start + limit]
492
        
493
        allowed = self._list_object_permissions(user, account, container, prefix, shared, public)
494
        if shared and not allowed:
495
            return []
496
        path, node = self._lookup_container(account, container)
497
        allowed = self._get_formatted_paths(allowed)
498
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
499
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
500
        return objects[start:start + limit]
501
    
502
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
503
        public = self._list_object_permissions(user, account, container, prefix, shared=False, public=True)
504
        paths, nodes = self._lookup_objects(public)
505
        path = '/'.join((account, container))
506
        cont_prefix = path + '/'
507
        paths = [x[len(cont_prefix):] for x in paths]
508
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
509
        objects = [(path,) + props for path, props in zip(paths, props)]
510
        return objects
511
        
512
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
513
        objects = []
514
        while True:
515
            marker = objects[-1] if objects else None
516
            limit = 10000
517
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
518
            objects.extend(l)
519
            if not l or len(l) < limit:
520
                break
521
        return objects
522
    
523
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
524
        allowed = []
525
        path = '/'.join((account, container, prefix)).rstrip('/')
526
        if user != account:
527
            allowed = self.permissions.access_list_paths(user, path)
528
            if not allowed:
529
                raise NotAllowedError
530
        else:
531
            allowed = set()
532
            if shared:
533
                allowed.update(self.permissions.access_list_shared(path))
534
            if public:
535
                allowed.update([x[0] for x in self.permissions.public_list(path)])
536
            allowed = sorted(allowed)
537
            if not allowed:
538
                return []
539
        return allowed
540
    
541
    @backend_method
542
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
543
        """Return a list of object (name, version_id) tuples existing under a container."""
544
        
545
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
546
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
547
    
548
    @backend_method
549
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
550
        """Return a list of object metadata dicts existing under a container."""
551
        
552
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
553
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
554
        objects = []
555
        for p in props:
556
            if len(p) == 2:
557
                objects.append({'subdir': p[0]})
558
            else:
559
                objects.append({'name': p[0],
560
                                'bytes': p[self.SIZE + 1],
561
                                'type': p[self.TYPE + 1],
562
                                'hash': p[self.HASH + 1],
563
                                'version': p[self.SERIAL + 1],
564
                                'version_timestamp': p[self.MTIME + 1],
565
                                'modified': p[self.MTIME + 1] if until is None else None,
566
                                'modified_by': p[self.MUSER + 1],
567
                                'uuid': p[self.UUID + 1],
568
                                'checksum': p[self.CHECKSUM + 1]})
569
        return objects
570
    
571
    @backend_method
572
    def list_object_permissions(self, user, account, container, prefix=''):
573
        """Return a list of paths that enforce permissions under a container."""
574
        
575
        logger.debug("list_object_permissions: %s %s %s %s", user, account, container, prefix)
576
        return self._list_object_permissions(user, account, container, prefix, True, False)
577
    
578
    @backend_method
579
    def list_object_public(self, user, account, container, prefix=''):
580
        """Return a dict mapping paths to public ids for objects that are public under a container."""
581
        
582
        logger.debug("list_object_public: %s %s %s %s", user, account, container, prefix)
583
        public = {}
584
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
585
            public[path] = p + ULTIMATE_ANSWER
586
        return public
587
    
588
    @backend_method
589
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
590
        """Return a dictionary with the object metadata for the domain."""
591
        
592
        logger.debug("get_object_meta: %s %s %s %s %s %s", user, account, container, name, domain, version)
593
        self._can_read(user, account, container, name)
594
        path, node = self._lookup_object(account, container, name)
595
        props = self._get_version(node, version)
596
        if version is None:
597
            modified = props[self.MTIME]
598
        else:
599
            try:
600
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
601
            except NameError: # Object may be deleted.
602
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
603
                if del_props is None:
604
                    raise NameError('Object does not exist')
605
                modified = del_props[self.MTIME]
606
        
607
        meta = {}
608
        if include_user_defined:
609
            meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
610
        meta.update({'name': name,
611
                     'bytes': props[self.SIZE],
612
                     'type': props[self.TYPE],
613
                     'hash': props[self.HASH],
614
                     'version': props[self.SERIAL],
615
                     'version_timestamp': props[self.MTIME],
616
                     'modified': modified,
617
                     'modified_by': props[self.MUSER],
618
                     'uuid': props[self.UUID],
619
                     'checksum': props[self.CHECKSUM]})
620
        return meta
621
    
622
    @backend_method
623
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
624
        """Update the metadata associated with the object for the domain and return the new version."""
625
        
626
        logger.debug("update_object_meta: %s %s %s %s %s %s %s", user, account, container, name, domain, meta, replace)
627
        self._can_write(user, account, container, name)
628
        path, node = self._lookup_object(account, container, name)
629
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
630
        self._apply_versioning(account, container, src_version_id)
631
        return dest_version_id
632
    
633
    @backend_method
634
    def get_object_permissions(self, user, account, container, name):
635
        """Return the action allowed on the object, the path
636
        from which the object gets its permissions from,
637
        along with a dictionary containing the permissions."""
638
        
639
        logger.debug("get_object_permissions: %s %s %s %s", user, account, container, name)
640
        allowed = 'write'
641
        permissions_path = self._get_permissions_path(account, container, name)
642
        if user != account:
643
            if self.permissions.access_check(permissions_path, self.WRITE, user):
644
                allowed = 'write'
645
            elif self.permissions.access_check(permissions_path, self.READ, user):
646
                allowed = 'read'
647
            else:
648
                raise NotAllowedError
649
        self._lookup_object(account, container, name)
650
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
651
    
652
    @backend_method
653
    def update_object_permissions(self, user, account, container, name, permissions):
654
        """Update the permissions associated with the object."""
655
        
656
        logger.debug("update_object_permissions: %s %s %s %s %s", user, account, container, name, permissions)
657
        if user != account:
658
            raise NotAllowedError
659
        path = self._lookup_object(account, container, name)[0]
660
        self._check_permissions(path, permissions)
661
        self.permissions.access_set(path, permissions)
662
        self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
663
    
664
    @backend_method
665
    def get_object_public(self, user, account, container, name):
666
        """Return the public id of the object if applicable."""
667
        
668
        logger.debug("get_object_public: %s %s %s %s", user, account, container, name)
669
        self._can_read(user, account, container, name)
670
        path = self._lookup_object(account, container, name)[0]
671
        p = self.permissions.public_get(path)
672
        if p is not None:
673
            p += ULTIMATE_ANSWER
674
        return p
675
    
676
    @backend_method
677
    def update_object_public(self, user, account, container, name, public):
678
        """Update the public status of the object."""
679
        
680
        logger.debug("update_object_public: %s %s %s %s %s", user, account, container, name, public)
681
        self._can_write(user, account, container, name)
682
        path = self._lookup_object(account, container, name)[0]
683
        if not public:
684
            self.permissions.public_unset(path)
685
        else:
686
            self.permissions.public_set(path)
687
    
688
    @backend_method
689
    def get_object_hashmap(self, user, account, container, name, version=None):
690
        """Return the object's size and a list with partial hashes."""
691
        
692
        logger.debug("get_object_hashmap: %s %s %s %s %s", user, account, container, name, version)
693
        self._can_read(user, account, container, name)
694
        path, node = self._lookup_object(account, container, name)
695
        props = self._get_version(node, version)
696
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
697
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
698
    
699
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
700
        if permissions is not None and user != account:
701
            raise NotAllowedError
702
        self._can_write(user, account, container, name)
703
        if permissions is not None:
704
            path = '/'.join((account, container, name))
705
            self._check_permissions(path, permissions)
706
        
707
        account_path, account_node = self._lookup_account(account, True)
708
        container_path, container_node = self._lookup_container(account, container)
709
        path, node = self._put_object_node(container_path, container_node, name)
710
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
711
        
712
        # Handle meta.
713
        if src_version_id is None:
714
            src_version_id = pre_version_id
715
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
716
        
717
        # Check quota.
718
        del_size = self._apply_versioning(account, container, pre_version_id)
719
        size_delta = size - del_size
720
        if size_delta > 0:
721
            account_quota = long(self._get_policy(account_node)['quota'])
722
            container_quota = long(self._get_policy(container_node)['quota'])
723
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
724
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
725
                # This must be executed in a transaction, so the version is never created if it fails.
726
                raise QuotaError
727
        self._report_size_change(user, account, size_delta, {'action': 'object update'})
728
        
729
        if permissions is not None:
730
            self.permissions.access_set(path, permissions)
731
            self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
732
        
733
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
734
        return dest_version_id
735
    
736
    @backend_method
737
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
738
        """Create/update an object with the specified size and partial hashes."""
739
        
740
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user, account, container, name, size, type, hashmap, checksum)
741
        if size == 0: # No such thing as an empty hashmap.
742
            hashmap = [self.put_block('')]
743
        map = HashMap(self.block_size, self.hash_algorithm)
744
        map.extend([binascii.unhexlify(x) for x in hashmap])
745
        missing = self.store.block_search(map)
746
        if missing:
747
            ie = IndexError()
748
            ie.data = [binascii.hexlify(x) for x in missing]
749
            raise ie
750
        
751
        hash = map.hash()
752
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
753
        self.store.map_put(hash, map)
754
        return dest_version_id
755
    
756
    @backend_method
757
    def update_object_checksum(self, user, account, container, name, version, checksum):
758
        """Update an object's checksum."""
759
        
760
        logger.debug("update_object_checksum: %s %s %s %s %s %s", user, account, container, name, version, checksum)
761
        # Update objects with greater version and same hashmap and size (fix metadata updates).
762
        self._can_write(user, account, container, name)
763
        path, node = self._lookup_object(account, container, name)
764
        props = self._get_version(node, version)
765
        versions = self.node.node_get_versions(node)
766
        for x in versions:
767
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
768
                self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
769
    
770
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
771
        dest_version_ids = []
772
        self._can_read(user, src_account, src_container, src_name)
773
        path, node = self._lookup_object(src_account, src_container, src_name)
774
        # TODO: Will do another fetch of the properties in duplicate version...
775
        props = self._get_version(node, src_version) # Check to see if source exists.
776
        src_version_id = props[self.SERIAL]
777
        hash = props[self.HASH]
778
        size = props[self.SIZE]
779
        is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
780
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
781
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
782
                self._delete_object(user, src_account, src_container, src_name)
783
        
784
        if delimiter:
785
            prefix = src_name + delimiter if not src_name.endswith(delimiter) else src_name
786
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
787
            paths = [elem[0] for elem in src_names]
788
            nodes = [elem[2] for elem in src_names]
789
            # TODO: Will do another fetch of the properties in duplicate version...
790
            props = self._get_versions(nodes) # Check to see if source exists.
791
            
792
            for prop, path, node in zip(props, paths, nodes):
793
                src_version_id = prop[self.SERIAL]
794
                hash = prop[self.HASH]
795
                vtype = prop[self.TYPE]
796
                dest_prefix = dest_name + delimiter if not dest_name.endswith(delimiter) else dest_name
797
                vdest_name = path.replace(prefix, dest_prefix, 1)
798
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
799
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
800
                        self._delete_object(user, src_account, src_container, path)
801
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
802
    
803
    @backend_method
804
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
805
        """Copy an object's data and metadata."""
806
        
807
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
808
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
809
        return dest_version_id
810
    
811
    @backend_method
812
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
813
        """Move an object's data and metadata."""
814
        
815
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
816
        if user != src_account:
817
            raise NotAllowedError
818
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
819
        return dest_version_id
820
    
821
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
822
        if user != account:
823
            raise NotAllowedError
824
        
825
        if until is not None:
826
            path = '/'.join((account, container, name))
827
            node = self.node.node_lookup(path)
828
            if node is None:
829
                return
830
            hashes = []
831
            size = 0
832
            h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
833
            hashes += h
834
            size += s
835
            h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
836
            hashes += h
837
            size += s
838
            for h in hashes:
839
                self.store.map_delete(h)
840
            self.node.node_purge(node, until, CLUSTER_DELETED)
841
            try:
842
                props = self._get_version(node)
843
            except NameError:
844
                self.permissions.access_clear(path)
845
            self._report_size_change(user, account, -size, {'action': 'object purge'})
846
            return
847
        
848
        path, node = self._lookup_object(account, container, name)
849
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
850
        del_size = self._apply_versioning(account, container, src_version_id)
851
        if del_size:
852
            self._report_size_change(user, account, -del_size, {'action': 'object delete'})
853
        self._report_object_change(user, account, path, details={'action': 'object delete'})
854
        self.permissions.access_clear(path)
855
        
856
        if delimiter:
857
            prefix = name + delimiter if not name.endswith(delimiter) else name
858
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
859
            paths = []
860
            for t in src_names:
861
                    path = '/'.join((account, container, t[0]))
862
                    node = t[2]
863
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
864
                del_size = self._apply_versioning(account, container, src_version_id)
865
                if del_size:
866
                    self._report_size_change(user, account, -del_size, {'action': 'object delete'})
867
                self._report_object_change(user, account, path, details={'action': 'object delete'})
868
                paths.append(path)
869
            self.permissions.access_clear_bulk(paths)
870
    
871
    @backend_method
872
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
873
        """Delete/purge an object."""
874
        
875
        logger.debug("delete_object: %s %s %s %s %s %s %s", user, account, container, name, until, prefix, delimiter)
876
        self._delete_object(user, account, container, name, until, delimiter)
877
    
878
    @backend_method
879
    def list_versions(self, user, account, container, name):
880
        """Return a list of all (version, version_timestamp) tuples for an object."""
881
        
882
        logger.debug("list_versions: %s %s %s %s", user, account, container, name)
883
        self._can_read(user, account, container, name)
884
        path, node = self._lookup_object(account, container, name)
885
        versions = self.node.node_get_versions(node)
886
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
887
    
888
    @backend_method
889
    def get_uuid(self, user, uuid):
890
        """Return the (account, container, name) for the UUID given."""
891
        
892
        logger.debug("get_uuid: %s %s", user, uuid)
893
        info = self.node.latest_uuid(uuid)
894
        if info is None:
895
            raise NameError
896
        path, serial = info
897
        account, container, name = path.split('/', 2)
898
        self._can_read(user, account, container, name)
899
        return (account, container, name)
900
    
901
    @backend_method
902
    def get_public(self, user, public):
903
        """Return the (account, container, name) for the public id given."""
904
        
905
        logger.debug("get_public: %s %s", user, public)
906
        if public is None or public < ULTIMATE_ANSWER:
907
            raise NameError
908
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
909
        if path is None:
910
            raise NameError
911
        account, container, name = path.split('/', 2)
912
        self._can_read(user, account, container, name)
913
        return (account, container, name)
914
    
915
    @backend_method(autocommit=0)
916
    def get_block(self, hash):
917
        """Return a block's data."""
918
        
919
        logger.debug("get_block: %s", hash)
920
        block = self.store.block_get(binascii.unhexlify(hash))
921
        if not block:
922
            raise NameError('Block does not exist')
923
        return block
924
    
925
    @backend_method(autocommit=0)
926
    def put_block(self, data):
927
        """Store a block and return the hash."""
928
        
929
        logger.debug("put_block: %s", len(data))
930
        return binascii.hexlify(self.store.block_put(data))
931
    
932
    @backend_method(autocommit=0)
933
    def update_block(self, hash, data, offset=0):
934
        """Update a known block and return the hash."""
935
        
936
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
937
        if offset == 0 and len(data) == self.block_size:
938
            return self.put_block(data)
939
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
940
        return binascii.hexlify(h)
941
    
942
    # Path functions.
943
    
944
    def _generate_uuid(self):
945
        return str(uuidlib.uuid4())
946
    
947
    def _put_object_node(self, path, parent, name):
948
        path = '/'.join((path, name))
949
        node = self.node.node_lookup(path)
950
        if node is None:
951
            node = self.node.node_create(parent, path)
952
        return path, node
953
    
954
    def _put_path(self, user, parent, path):
955
        node = self.node.node_create(parent, path)
956
        self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
957
        return node
958
    
959
    def _lookup_account(self, account, create=True):
960
        node = self.node.node_lookup(account)
961
        if node is None and create:
962
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
963
        return account, node
964
    
965
    def _lookup_container(self, account, container):
966
        path = '/'.join((account, container))
967
        node = self.node.node_lookup(path)
968
        if node is None:
969
            raise NameError('Container does not exist')
970
        return path, node
971
    
972
    def _lookup_object(self, account, container, name):
973
        path = '/'.join((account, container, name))
974
        node = self.node.node_lookup(path)
975
        if node is None:
976
            raise NameError('Object does not exist')
977
        return path, node
978
    
979
    def _lookup_objects(self, paths):
980
            nodes = self.node.node_lookup_bulk(paths)
981
        if nodes is None:
982
            raise NameError('Object does not exist')
983
        return paths, nodes
984
    
985
    def _get_properties(self, node, until=None):
986
        """Return properties until the timestamp given."""
987
        
988
        before = until if until is not None else inf
989
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
990
        if props is None and until is not None:
991
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
992
        if props is None:
993
            raise NameError('Path does not exist')
994
        return props
995
    
996
    def _get_statistics(self, node, until=None):
997
        """Return count, sum of size and latest timestamp of everything under node."""
998
        
999
        if until is None:
1000
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1001
        else:
1002
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1003
        if stats is None:
1004
            stats = (0, 0, 0)
1005
        return stats
1006
    
1007
    def _get_version(self, node, version=None):
1008
        if version is None:
1009
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1010
            if props is None:
1011
                raise NameError('Object does not exist')
1012
        else:
1013
            try:
1014
                version = int(version)
1015
            except ValueError:
1016
                raise IndexError('Version does not exist')
1017
            props = self.node.version_get_properties(version)
1018
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1019
                raise IndexError('Version does not exist')
1020
        return props
1021

    
1022
    def _get_versions(self, nodes, version=None):
1023
        if version is None:
1024
            props = self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1025
            if not props:
1026
                raise NameError('Object does not exist')
1027
        else:
1028
            try:
1029
                version = int(version)
1030
            except ValueError:
1031
                raise IndexError('Version does not exist')
1032
            props = self.node.version_get_properties(version)
1033
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1034
                raise IndexError('Version does not exist')
1035
        return props
1036
    
1037
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1038
        """Create a new version of the node."""
1039
        
1040
        props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1041
        if props is not None:
1042
            src_version_id = props[self.SERIAL]
1043
            src_hash = props[self.HASH]
1044
            src_size = props[self.SIZE]
1045
            src_type = props[self.TYPE]
1046
            src_checksum = props[self.CHECKSUM]
1047
        else:
1048
            src_version_id = None
1049
            src_hash = None
1050
            src_size = 0
1051
            src_type = ''
1052
            src_checksum = ''
1053
        if size is None: # Set metadata.
1054
            hash = src_hash # This way hash can be set to None (account or container).
1055
            size = src_size
1056
        if type is None:
1057
            type = src_type
1058
        if checksum is None:
1059
            checksum = src_checksum
1060
        uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
1061
        
1062
        if src_node is None:
1063
            pre_version_id = src_version_id
1064
        else:
1065
            pre_version_id = None
1066
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1067
            if props is not None:
1068
                pre_version_id = props[self.SERIAL]
1069
        if pre_version_id is not None:
1070
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1071
        
1072
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1073
        return pre_version_id, dest_version_id
1074
    
1075
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1076
        if src_version_id is not None:
1077
            self.node.attribute_copy(src_version_id, dest_version_id)
1078
        if not replace:
1079
            self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
1080
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
1081
        else:
1082
            self.node.attribute_del(dest_version_id, domain)
1083
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
1084
    
1085
    def _put_metadata(self, user, node, domain, meta, replace=False):
1086
        """Create a new version and store metadata."""
1087
        
1088
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
1089
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
1090
        return src_version_id, dest_version_id
1091
    
1092
    def _list_limits(self, listing, marker, limit):
1093
        start = 0
1094
        if marker:
1095
            try:
1096
                start = listing.index(marker) + 1
1097
            except ValueError:
1098
                pass
1099
        if not limit or limit > 10000:
1100
            limit = 10000
1101
        return start, limit
1102
    
1103
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1104
        cont_prefix = path + '/'
1105
        prefix = cont_prefix + prefix
1106
        start = cont_prefix + marker if marker else None
1107
        before = until if until is not None else inf
1108
        filterq = keys if domain else []
1109
        sizeq = size_range
1110
        
1111
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1112
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1113
        objects.sort(key=lambda x: x[0])
1114
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1115
        return objects
1116
        
1117
    # Reporting functions.
1118
    
1119
    def _report_size_change(self, user, account, size, details={}):
1120
        logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1121
        account_node = self._lookup_account(account, True)[1]
1122
        total = self._get_statistics(account_node)[1]
1123
        details.update({'user': user, 'total': total})
1124
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1125
    
1126
    def _report_object_change(self, user, account, path, details={}):
1127
        logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1128
        details.update({'user': user})
1129
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1130
    
1131
    def _report_sharing_change(self, user, account, path, details={}):
1132
        logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
1133
        details.update({'user': user})
1134
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1135
    
1136
    # Policy functions.
1137
    
1138
    def _check_policy(self, policy):
1139
        for k in policy.keys():
1140
            if policy[k] == '':
1141
                policy[k] = self.default_policy.get(k)
1142
        for k, v in policy.iteritems():
1143
            if k == 'quota':
1144
                q = int(v) # May raise ValueError.
1145
                if q < 0:
1146
                    raise ValueError
1147
            elif k == 'versioning':
1148
                if v not in ['auto', 'none']:
1149
                    raise ValueError
1150
            else:
1151
                raise ValueError
1152
    
1153
    def _put_policy(self, node, policy, replace):
1154
        if replace:
1155
            for k, v in self.default_policy.iteritems():
1156
                if k not in policy:
1157
                    policy[k] = v
1158
        self.node.policy_set(node, policy)
1159
    
1160
    def _get_policy(self, node):
1161
        policy = self.default_policy.copy()
1162
        policy.update(self.node.policy_get(node))
1163
        return policy
1164
    
1165
    def _apply_versioning(self, account, container, version_id):
1166
        """Delete the provided version if such is the policy.
1167
           Return size of object removed.
1168
        """
1169
        
1170
        if version_id is None:
1171
            return 0
1172
        path, node = self._lookup_container(account, container)
1173
        versioning = self._get_policy(node)['versioning']
1174
        if versioning != 'auto':
1175
            hash, size = self.node.version_remove(version_id)
1176
            self.store.map_delete(hash)
1177
            return size
1178
        return 0
1179
    
1180
    # Access control functions.
1181
    
1182
    def _check_groups(self, groups):
1183
        # raise ValueError('Bad characters in groups')
1184
        pass
1185
    
1186
    def _check_permissions(self, path, permissions):
1187
        # raise ValueError('Bad characters in permissions')
1188
        pass
1189
    
1190
    def _get_formatted_paths(self, paths):
1191
        formatted = []
1192
        for p in paths:
1193
            node = self.node.node_lookup(p)
1194
            if node is not None:
1195
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1196
            if props is not None:
1197
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1198
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1199
                formatted.append((p, self.MATCH_EXACT))
1200
        return formatted
1201
    
1202
    def _get_permissions_path(self, account, container, name):
1203
        path = '/'.join((account, container, name))
1204
        permission_paths = self.permissions.access_inherit(path)
1205
        permission_paths.sort()
1206
        permission_paths.reverse()
1207
        for p in permission_paths:
1208
            if p == path:
1209
                return p
1210
            else:
1211
                if p.count('/') < 2:
1212
                    continue
1213
                node = self.node.node_lookup(p)
1214
                if node is not None:
1215
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1216
                if props is not None:
1217
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1218
                        return p
1219
        return None
1220
    
1221
    def _can_read(self, user, account, container, name):
1222
        if user == account:
1223
            return True
1224
        path = '/'.join((account, container, name))
1225
        if self.permissions.public_get(path) is not None:
1226
            return True
1227
        path = self._get_permissions_path(account, container, name)
1228
        if not path:
1229
            raise NotAllowedError
1230
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1231
            raise NotAllowedError
1232
    
1233
    def _can_write(self, user, account, container, name):
1234
        if user == account:
1235
            return True
1236
        path = '/'.join((account, container, name))
1237
        path = self._get_permissions_path(account, container, name)
1238
        if not path:
1239
            raise NotAllowedError
1240
        if not self.permissions.access_check(path, self.WRITE, user):
1241
            raise NotAllowedError
1242
    
1243
    def _allowed_accounts(self, user):
1244
        allow = set()
1245
        for path in self.permissions.access_list_paths(user):
1246
            allow.add(path.split('/', 1)[0])
1247
        return sorted(allow)
1248
    
1249
    def _allowed_containers(self, user, account):
1250
        allow = set()
1251
        for path in self.permissions.access_list_paths(user, account):
1252
            allow.add(path.split('/', 2)[1])
1253
        return sorted(allow)