Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 07867f70

History | View | Annotate | Download (56.7 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
43
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
44

    
45
# Stripped-down version of the HashMap class found in tools.
46
class HashMap(list):
47

    
48
    def __init__(self, blocksize, blockhash):
49
        super(HashMap, self).__init__()
50
        self.blocksize = blocksize
51
        self.blockhash = blockhash
52

    
53
    def _hash_raw(self, v):
54
        h = hashlib.new(self.blockhash)
55
        h.update(v)
56
        return h.digest()
57

    
58
    def hash(self):
59
        if len(self) == 0:
60
            return self._hash_raw('')
61
        if len(self) == 1:
62
            return self.__getitem__(0)
63

    
64
        h = list(self)
65
        s = 2
66
        while s < len(h):
67
            s = s * 2
68
        h += [('\x00' * len(h[0]))] * (s - len(h))
69
        while len(h) > 1:
70
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
71
        return h[0]
72

    
73
# Default modules and settings.
74
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
75
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
76
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
77
DEFAULT_BLOCK_PATH = 'data/'
78
DEFAULT_BLOCK_UMASK = 0o022
79
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
80
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
81

    
82
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
83
QUEUE_CLIENT_ID = 'pithos'
84
QUEUE_INSTANCE_ID = '1'
85

    
86
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
87

    
88
inf = float('inf')
89

    
90
ULTIMATE_ANSWER = 42
91

    
92

    
93
logger = logging.getLogger(__name__)
94

    
95

    
96
def backend_method(func=None, autocommit=1):
97
    if func is None:
98
        def fn(func):
99
            return backend_method(func, autocommit)
100
        return fn
101

    
102
    if not autocommit:
103
        return func
104
    def fn(self, *args, **kw):
105
        self.wrapper.execute()
106
        try:
107
            self.messages = []
108
            ret = func(self, *args, **kw)
109
            for m in self.messages:
110
                self.queue.send(*m)
111
            self.wrapper.commit()
112
            return ret
113
        except:
114
            self.wrapper.rollback()
115
            raise
116
    return fn
117

    
118

    
119
class ModularBackend(BaseBackend):
120
    """A modular backend.
121
    
122
    Uses modules for SQL functions and storage.
123
    """
124
    
125
    def __init__(self, db_module=None, db_connection=None,
126
                 block_module=None, block_path=None, block_umask=None,
127
                 queue_module=None, queue_connection=None):
128
        db_module = db_module or DEFAULT_DB_MODULE
129
        db_connection = db_connection or DEFAULT_DB_CONNECTION
130
        block_module = block_module or DEFAULT_BLOCK_MODULE
131
        block_path = block_path or DEFAULT_BLOCK_PATH
132
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
133
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
134
        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
135
        
136
        self.hash_algorithm = 'sha256'
137
        self.block_size = 4 * 1024 * 1024 # 4MB
138
        
139
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
140
        
141
        def load_module(m):
142
            __import__(m)
143
            return sys.modules[m]
144
        
145
        self.db_module = load_module(db_module)
146
        self.wrapper = self.db_module.DBWrapper(db_connection)
147
        params = {'wrapper': self.wrapper}
148
        self.permissions = self.db_module.Permissions(**params)
149
        for x in ['READ', 'WRITE']:
150
            setattr(self, x, getattr(self.db_module, x))
151
        self.node = self.db_module.Node(**params)
152
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
153
            setattr(self, x, getattr(self.db_module, x))
154
        
155
        self.block_module = load_module(block_module)
156
        params = {'path': block_path,
157
                  'block_size': self.block_size,
158
                  'hash_algorithm': self.hash_algorithm,
159
                  'umask': block_umask}
160
        self.store = self.block_module.Store(**params)
161

    
162
        if queue_module and queue_connection:
163
            self.queue_module = load_module(queue_module)
164
            params = {'exchange': queue_connection,
165
                      'client_id': QUEUE_CLIENT_ID}
166
            self.queue = self.queue_module.Queue(**params)
167
        else:
168
            class NoQueue:
169
                def send(self, *args):
170
                    pass
171
                
172
                def close(self):
173
                    pass
174
            
175
            self.queue = NoQueue()
176
    
177
    def close(self):
178
        self.wrapper.close()
179
        self.queue.close()
180
    
181
    @backend_method
182
    def list_accounts(self, user, marker=None, limit=10000):
183
        """Return a list of accounts the user can access."""
184
        
185
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
186
        allowed = self._allowed_accounts(user)
187
        start, limit = self._list_limits(allowed, marker, limit)
188
        return allowed[start:start + limit]
189
    
190
    @backend_method
191
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
192
        """Return a dictionary with the account metadata for the domain."""
193
        
194
        logger.debug("get_account_meta: %s %s %s %s", user, account, domain, until)
195
        path, node = self._lookup_account(account, user == account)
196
        if user != account:
197
            if until or node is None or account not in self._allowed_accounts(user):
198
                raise NotAllowedError
199
        try:
200
            props = self._get_properties(node, until)
201
            mtime = props[self.MTIME]
202
        except NameError:
203
            props = None
204
            mtime = until
205
        count, bytes, tstamp = self._get_statistics(node, until)
206
        tstamp = max(tstamp, mtime)
207
        if until is None:
208
            modified = tstamp
209
        else:
210
            modified = self._get_statistics(node)[2] # Overall last modification.
211
            modified = max(modified, mtime)
212
        
213
        if user != account:
214
            meta = {'name': account}
215
        else:
216
            meta = {}
217
            if props is not None and include_user_defined:
218
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
219
            if until is not None:
220
                meta.update({'until_timestamp': tstamp})
221
            meta.update({'name': account, 'count': count, 'bytes': bytes})
222
        meta.update({'modified': modified})
223
        return meta
224
    
225
    @backend_method
226
    def update_account_meta(self, user, account, domain, meta, replace=False):
227
        """Update the metadata associated with the account for the domain."""
228
        
229
        logger.debug("update_account_meta: %s %s %s %s %s", user, account, domain, meta, replace)
230
        if user != account:
231
            raise NotAllowedError
232
        path, node = self._lookup_account(account, True)
233
        self._put_metadata(user, node, domain, meta, replace)
234
    
235
    @backend_method
236
    def get_account_groups(self, user, account):
237
        """Return a dictionary with the user groups defined for this account."""
238
        
239
        logger.debug("get_account_groups: %s %s", user, account)
240
        if user != account:
241
            if account not in self._allowed_accounts(user):
242
                raise NotAllowedError
243
            return {}
244
        self._lookup_account(account, True)
245
        return self.permissions.group_dict(account)
246
    
247
    @backend_method
248
    def update_account_groups(self, user, account, groups, replace=False):
249
        """Update the groups associated with the account."""
250
        
251
        logger.debug("update_account_groups: %s %s %s %s", user, account, groups, replace)
252
        if user != account:
253
            raise NotAllowedError
254
        self._lookup_account(account, True)
255
        self._check_groups(groups)
256
        if replace:
257
            self.permissions.group_destroy(account)
258
        for k, v in groups.iteritems():
259
            if not replace: # If not already deleted.
260
                self.permissions.group_delete(account, k)
261
            if v:
262
                self.permissions.group_addmany(account, k, v)
263
    
264
    @backend_method
265
    def get_account_policy(self, user, account):
266
        """Return a dictionary with the account policy."""
267
        
268
        logger.debug("get_account_policy: %s %s", user, account)
269
        if user != account:
270
            if account not in self._allowed_accounts(user):
271
                raise NotAllowedError
272
            return {}
273
        path, node = self._lookup_account(account, True)
274
        return self._get_policy(node)
275
    
276
    @backend_method
277
    def update_account_policy(self, user, account, policy, replace=False):
278
        """Update the policy associated with the account."""
279
        
280
        logger.debug("update_account_policy: %s %s %s %s", user, account, policy, replace)
281
        if user != account:
282
            raise NotAllowedError
283
        path, node = self._lookup_account(account, True)
284
        self._check_policy(policy)
285
        self._put_policy(node, policy, replace)
286
    
287
    @backend_method
288
    def put_account(self, user, account, policy={}):
289
        """Create a new account with the given name."""
290
        
291
        logger.debug("put_account: %s %s %s", user, account, policy)
292
        if user != account:
293
            raise NotAllowedError
294
        node = self.node.node_lookup(account)
295
        if node is not None:
296
            raise AccountExists('Account already exists')
297
        if policy:
298
            self._check_policy(policy)
299
        node = self._put_path(user, self.ROOTNODE, account)
300
        self._put_policy(node, policy, True)
301
    
302
    @backend_method
303
    def delete_account(self, user, account):
304
        """Delete the account with the given name."""
305
        
306
        logger.debug("delete_account: %s %s", user, account)
307
        if user != account:
308
            raise NotAllowedError
309
        node = self.node.node_lookup(account)
310
        if node is None:
311
            return
312
        if not self.node.node_remove(node):
313
            raise AccountNotEmpty('Account is not empty')
314
        self.permissions.group_destroy(account)
315
    
316
    @backend_method
317
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
318
        """Return a list of containers existing under an account."""
319
        
320
        logger.debug("list_containers: %s %s %s %s %s %s %s", user, account, marker, limit, shared, until, public)
321
        if user != account:
322
            if until or account not in self._allowed_accounts(user):
323
                raise NotAllowedError
324
            allowed = self._allowed_containers(user, account)
325
            start, limit = self._list_limits(allowed, marker, limit)
326
            return allowed[start:start + limit]
327
        if shared or public:
328
            allowed = set()
329
            if shared:
330
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
331
            if public:
332
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
333
            allowed = sorted(allowed)
334
            start, limit = self._list_limits(allowed, marker, limit)
335
            return allowed[start:start + limit]
336
        node = self.node.node_lookup(account)
337
        containers = [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
338
        start, limit = self._list_limits([x[0] for x in containers], marker, limit)
339
        return containers[start:start + limit]
340
    
341
    @backend_method
342
    def list_container_meta(self, user, account, container, domain, until=None):
343
        """Return a list with all the container's object meta keys for the domain."""
344
        
345
        logger.debug("list_container_meta: %s %s %s %s %s", user, account, container, domain, until)
346
        allowed = []
347
        if user != account:
348
            if until:
349
                raise NotAllowedError
350
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
351
            if not allowed:
352
                raise NotAllowedError
353
        path, node = self._lookup_container(account, container)
354
        before = until if until is not None else inf
355
        allowed = self._get_formatted_paths(allowed)
356
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
357
    
358
    @backend_method
359
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
360
        """Return a dictionary with the container metadata for the domain."""
361
        
362
        logger.debug("get_container_meta: %s %s %s %s %s", user, account, container, domain, until)
363
        if user != account:
364
            if until or container not in self._allowed_containers(user, account):
365
                raise NotAllowedError
366
        path, node = self._lookup_container(account, container)
367
        props = self._get_properties(node, until)
368
        mtime = props[self.MTIME]
369
        count, bytes, tstamp = self._get_statistics(node, until)
370
        tstamp = max(tstamp, mtime)
371
        if until is None:
372
            modified = tstamp
373
        else:
374
            modified = self._get_statistics(node)[2] # Overall last modification.
375
            modified = max(modified, mtime)
376
        
377
        if user != account:
378
            meta = {'name': container}
379
        else:
380
            meta = {}
381
            if include_user_defined:
382
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
383
            if until is not None:
384
                meta.update({'until_timestamp': tstamp})
385
            meta.update({'name': container, 'count': count, 'bytes': bytes})
386
        meta.update({'modified': modified})
387
        return meta
388
    
389
    @backend_method
390
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
391
        """Update the metadata associated with the container for the domain."""
392
        
393
        logger.debug("update_container_meta: %s %s %s %s %s %s", user, account, container, domain, meta, replace)
394
        if user != account:
395
            raise NotAllowedError
396
        path, node = self._lookup_container(account, container)
397
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
398
        if src_version_id is not None:
399
            versioning = self._get_policy(node)['versioning']
400
            if versioning != 'auto':
401
                self.node.version_remove(src_version_id)
402
    
403
    @backend_method
404
    def get_container_policy(self, user, account, container):
405
        """Return a dictionary with the container policy."""
406
        
407
        logger.debug("get_container_policy: %s %s %s", user, account, container)
408
        if user != account:
409
            if container not in self._allowed_containers(user, account):
410
                raise NotAllowedError
411
            return {}
412
        path, node = self._lookup_container(account, container)
413
        return self._get_policy(node)
414
    
415
    @backend_method
416
    def update_container_policy(self, user, account, container, policy, replace=False):
417
        """Update the policy associated with the container."""
418
        
419
        logger.debug("update_container_policy: %s %s %s %s %s", user, account, container, policy, replace)
420
        if user != account:
421
            raise NotAllowedError
422
        path, node = self._lookup_container(account, container)
423
        self._check_policy(policy)
424
        self._put_policy(node, policy, replace)
425
    
426
    @backend_method
427
    def put_container(self, user, account, container, policy={}):
428
        """Create a new container with the given name."""
429
        
430
        logger.debug("put_container: %s %s %s %s", user, account, container, policy)
431
        if user != account:
432
            raise NotAllowedError
433
        try:
434
            path, node = self._lookup_container(account, container)
435
        except NameError:
436
            pass
437
        else:
438
            raise ContainerExists('Container already exists')
439
        if policy:
440
            self._check_policy(policy)
441
        path = '/'.join((account, container))
442
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
443
        self._put_policy(node, policy, True)
444
    
445
    @backend_method
446
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
447
        """Delete/purge the container with the given name."""
448
        
449
        logger.debug("delete_container: %s %s %s %s %s %s", user, account, container, until, prefix, delimiter)
450
        if user != account:
451
            raise NotAllowedError
452
        path, node = self._lookup_container(account, container)
453
        
454
        if until is not None:
455
            hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
456
            for h in hashes:
457
                self.store.map_delete(h)
458
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
459
            self._report_size_change(user, account, -size, {'action': 'container purge'})
460
            return
461
        
462
        if self._get_statistics(node)[0] > 0:
463
            raise ContainerNotEmpty('Container is not empty')
464
        hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
465
        for h in hashes:
466
            self.store.map_delete(h)
467
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
468
        self.node.node_remove(node)
469
        self._report_size_change(user, account, -size, {'action': 'container delete'})
470
    
471
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
472
        if user != account and until:
473
            raise NotAllowedError
474
        if shared and public:
475
            # get shared first
476
            shared = self._list_object_permissions(user, account, container, prefix, shared=True, public=False)
477
            objects = []
478
            if shared:
479
                path, node = self._lookup_container(account, container)
480
                shared = self._get_formatted_paths(shared)
481
                objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props)
482
            
483
            # get public
484
            objects.extend(self._list_public_object_properties(user, account, container, prefix, all_props))
485
            
486
            objects.sort(key=lambda x: x[0])
487
            start, limit = self._list_limits([x[0] for x in objects], marker, limit)
488
            return objects[start:start + limit]
489
        elif public:
490
            objects = self._list_public_object_properties(user, account, container, prefix, all_props)
491
            start, limit = self._list_limits([x[0] for x in objects], marker, limit)
492
            return objects[start:start + limit]
493
        
494
        allowed = self._list_object_permissions(user, account, container, prefix, shared, public)
495
        if shared and not allowed:
496
            return []
497
        path, node = self._lookup_container(account, container)
498
        allowed = self._get_formatted_paths(allowed)
499
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
500
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
501
        return objects[start:start + limit]
502
    
503
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
504
        public = self._list_object_permissions(user, account, container, prefix, shared=False, public=True)
505
        paths, nodes = self._lookup_objects(public)
506
        path = '/'.join((account, container))
507
        cont_prefix = path + '/'
508
        paths = [x[len(cont_prefix):] for x in paths]
509
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
510
        objects = [(path,) + props for path, props in zip(paths, props)]
511
        return objects
512
        
513
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
514
        objects = []
515
        while True:
516
            marker = objects[-1] if objects else None
517
            limit = 10000
518
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
519
            objects.extend(l)
520
            if not l or len(l) < limit:
521
                break
522
        return objects
523
    
524
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
525
        allowed = []
526
        path = '/'.join((account, container, prefix)).rstrip('/')
527
        if user != account:
528
            allowed = self.permissions.access_list_paths(user, path)
529
            if not allowed:
530
                raise NotAllowedError
531
        else:
532
            allowed = set()
533
            if shared:
534
                allowed.update(self.permissions.access_list_shared(path))
535
            if public:
536
                allowed.update([x[0] for x in self.permissions.public_list(path)])
537
            allowed = sorted(allowed)
538
            if not allowed:
539
                return []
540
        return allowed
541
    
542
    @backend_method
543
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
544
        """Return a list of object (name, version_id) tuples existing under a container."""
545
        
546
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
547
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
548
    
549
    @backend_method
550
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
551
        """Return a list of object metadata dicts existing under a container."""
552
        
553
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
554
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
555
        objects = []
556
        for p in props:
557
            if len(p) == 2:
558
                objects.append({'subdir': p[0]})
559
            else:
560
                objects.append({'name': p[0],
561
                                'bytes': p[self.SIZE + 1],
562
                                'type': p[self.TYPE + 1],
563
                                'hash': p[self.HASH + 1],
564
                                'version': p[self.SERIAL + 1],
565
                                'version_timestamp': p[self.MTIME + 1],
566
                                'modified': p[self.MTIME + 1] if until is None else None,
567
                                'modified_by': p[self.MUSER + 1],
568
                                'uuid': p[self.UUID + 1],
569
                                'checksum': p[self.CHECKSUM + 1]})
570
        return objects
571
    
572
    @backend_method
573
    def list_object_permissions(self, user, account, container, prefix=''):
574
        """Return a list of paths that enforce permissions under a container."""
575
        
576
        logger.debug("list_object_permissions: %s %s %s %s", user, account, container, prefix)
577
        return self._list_object_permissions(user, account, container, prefix, True, False)
578
    
579
    @backend_method
580
    def list_object_public(self, user, account, container, prefix=''):
581
        """Return a dict mapping paths to public ids for objects that are public under a container."""
582
        
583
        logger.debug("list_object_public: %s %s %s %s", user, account, container, prefix)
584
        public = {}
585
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
586
            public[path] = p + ULTIMATE_ANSWER
587
        return public
588
    
589
    @backend_method
590
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
591
        """Return a dictionary with the object metadata for the domain."""
592
        
593
        logger.debug("get_object_meta: %s %s %s %s %s %s", user, account, container, name, domain, version)
594
        self._can_read(user, account, container, name)
595
        path, node = self._lookup_object(account, container, name)
596
        props = self._get_version(node, version)
597
        if version is None:
598
            modified = props[self.MTIME]
599
        else:
600
            try:
601
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
602
            except NameError: # Object may be deleted.
603
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
604
                if del_props is None:
605
                    raise ItemNotExists('Object does not exist')
606
                modified = del_props[self.MTIME]
607
        
608
        meta = {}
609
        if include_user_defined:
610
            meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
611
        meta.update({'name': name,
612
                     'bytes': props[self.SIZE],
613
                     'type': props[self.TYPE],
614
                     'hash': props[self.HASH],
615
                     'version': props[self.SERIAL],
616
                     'version_timestamp': props[self.MTIME],
617
                     'modified': modified,
618
                     'modified_by': props[self.MUSER],
619
                     'uuid': props[self.UUID],
620
                     'checksum': props[self.CHECKSUM]})
621
        return meta
622
    
623
    @backend_method
624
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
625
        """Update the metadata associated with the object for the domain and return the new version."""
626
        
627
        logger.debug("update_object_meta: %s %s %s %s %s %s %s", user, account, container, name, domain, meta, replace)
628
        self._can_write(user, account, container, name)
629
        path, node = self._lookup_object(account, container, name)
630
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
631
        self._apply_versioning(account, container, src_version_id)
632
        return dest_version_id
633
    
634
    @backend_method
635
    def get_object_permissions(self, user, account, container, name):
636
        """Return the action allowed on the object, the path
637
        from which the object gets its permissions from,
638
        along with a dictionary containing the permissions."""
639
        
640
        logger.debug("get_object_permissions: %s %s %s %s", user, account, container, name)
641
        allowed = 'write'
642
        permissions_path = self._get_permissions_path(account, container, name)
643
        if user != account:
644
            if self.permissions.access_check(permissions_path, self.WRITE, user):
645
                allowed = 'write'
646
            elif self.permissions.access_check(permissions_path, self.READ, user):
647
                allowed = 'read'
648
            else:
649
                raise NotAllowedError
650
        self._lookup_object(account, container, name)
651
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
652
    
653
    @backend_method
654
    def update_object_permissions(self, user, account, container, name, permissions):
655
        """Update the permissions associated with the object."""
656
        
657
        logger.debug("update_object_permissions: %s %s %s %s %s", user, account, container, name, permissions)
658
        if user != account:
659
            raise NotAllowedError
660
        path = self._lookup_object(account, container, name)[0]
661
        self._check_permissions(path, permissions)
662
        self.permissions.access_set(path, permissions)
663
        self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
664
    
665
    @backend_method
666
    def get_object_public(self, user, account, container, name):
667
        """Return the public id of the object if applicable."""
668
        
669
        logger.debug("get_object_public: %s %s %s %s", user, account, container, name)
670
        self._can_read(user, account, container, name)
671
        path = self._lookup_object(account, container, name)[0]
672
        p = self.permissions.public_get(path)
673
        if p is not None:
674
            p += ULTIMATE_ANSWER
675
        return p
676
    
677
    @backend_method
678
    def update_object_public(self, user, account, container, name, public):
679
        """Update the public status of the object."""
680
        
681
        logger.debug("update_object_public: %s %s %s %s %s", user, account, container, name, public)
682
        self._can_write(user, account, container, name)
683
        path = self._lookup_object(account, container, name)[0]
684
        if not public:
685
            self.permissions.public_unset(path)
686
        else:
687
            self.permissions.public_set(path)
688
    
689
    @backend_method
690
    def get_object_hashmap(self, user, account, container, name, version=None):
691
        """Return the object's size and a list with partial hashes."""
692
        
693
        logger.debug("get_object_hashmap: %s %s %s %s %s", user, account, container, name, version)
694
        self._can_read(user, account, container, name)
695
        path, node = self._lookup_object(account, container, name)
696
        props = self._get_version(node, version)
697
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
698
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
699
    
700
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
701
        if permissions is not None and user != account:
702
            raise NotAllowedError
703
        self._can_write(user, account, container, name)
704
        if permissions is not None:
705
            path = '/'.join((account, container, name))
706
            self._check_permissions(path, permissions)
707
        
708
        account_path, account_node = self._lookup_account(account, True)
709
        container_path, container_node = self._lookup_container(account, container)
710
        path, node = self._put_object_node(container_path, container_node, name)
711
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
712
        
713
        # Handle meta.
714
        if src_version_id is None:
715
            src_version_id = pre_version_id
716
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
717
        
718
        # Check quota.
719
        del_size = self._apply_versioning(account, container, pre_version_id)
720
        size_delta = size - del_size
721
        if size_delta > 0:
722
            account_quota = long(self._get_policy(account_node)['quota'])
723
            container_quota = long(self._get_policy(container_node)['quota'])
724
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
725
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
726
                # This must be executed in a transaction, so the version is never created if it fails.
727
                raise QuotaError
728
        self._report_size_change(user, account, size_delta, {'action': 'object update'})
729
        
730
        if permissions is not None:
731
            self.permissions.access_set(path, permissions)
732
            self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
733
        
734
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
735
        return dest_version_id
736
    
737
    @backend_method
738
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
739
        """Create/update an object with the specified size and partial hashes."""
740
        
741
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user, account, container, name, size, type, hashmap, checksum)
742
        if size == 0: # No such thing as an empty hashmap.
743
            hashmap = [self.put_block('')]
744
        map = HashMap(self.block_size, self.hash_algorithm)
745
        map.extend([binascii.unhexlify(x) for x in hashmap])
746
        missing = self.store.block_search(map)
747
        if missing:
748
            ie = IndexError()
749
            ie.data = [binascii.hexlify(x) for x in missing]
750
            raise ie
751
        
752
        hash = map.hash()
753
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
754
        self.store.map_put(hash, map)
755
        return dest_version_id
756
    
757
    @backend_method
758
    def update_object_checksum(self, user, account, container, name, version, checksum):
759
        """Update an object's checksum."""
760
        
761
        logger.debug("update_object_checksum: %s %s %s %s %s %s", user, account, container, name, version, checksum)
762
        # Update objects with greater version and same hashmap and size (fix metadata updates).
763
        self._can_write(user, account, container, name)
764
        path, node = self._lookup_object(account, container, name)
765
        props = self._get_version(node, version)
766
        versions = self.node.node_get_versions(node)
767
        for x in versions:
768
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
769
                self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
770
    
771
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
772
        dest_version_ids = []
773
        self._can_read(user, src_account, src_container, src_name)
774
        path, node = self._lookup_object(src_account, src_container, src_name)
775
        # TODO: Will do another fetch of the properties in duplicate version...
776
        props = self._get_version(node, src_version) # Check to see if source exists.
777
        src_version_id = props[self.SERIAL]
778
        hash = props[self.HASH]
779
        size = props[self.SIZE]
780
        is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
781
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
782
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
783
                self._delete_object(user, src_account, src_container, src_name)
784
        
785
        if delimiter:
786
            prefix = src_name + delimiter if not src_name.endswith(delimiter) else src_name
787
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
788
            src_names.sort(key=lambda x: x[2]) # order by nodes
789
            paths = [elem[0] for elem in src_names]
790
            nodes = [elem[2] for elem in src_names]
791
            # TODO: Will do another fetch of the properties in duplicate version...
792
            props = self._get_versions(nodes) # Check to see if source exists.
793
            
794
            for prop, path, node in zip(props, paths, nodes):
795
                src_version_id = prop[self.SERIAL]
796
                hash = prop[self.HASH]
797
                vtype = prop[self.TYPE]
798
                size = prop[self.SIZE]
799
                dest_prefix = dest_name + delimiter if not dest_name.endswith(delimiter) else dest_name
800
                vdest_name = path.replace(prefix, dest_prefix, 1)
801
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
802
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
803
                        self._delete_object(user, src_account, src_container, path)
804
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
805
    
806
    @backend_method
807
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
808
        """Copy an object's data and metadata."""
809
        
810
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
811
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
812
        return dest_version_id
813
    
814
    @backend_method
815
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
816
        """Move an object's data and metadata."""
817
        
818
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
819
        if user != src_account:
820
            raise NotAllowedError
821
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
822
        return dest_version_id
823
    
824
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
825
        if user != account:
826
            raise NotAllowedError
827
        
828
        if until is not None:
829
            path = '/'.join((account, container, name))
830
            node = self.node.node_lookup(path)
831
            if node is None:
832
                return
833
            hashes = []
834
            size = 0
835
            h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
836
            hashes += h
837
            size += s
838
            h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
839
            hashes += h
840
            size += s
841
            for h in hashes:
842
                self.store.map_delete(h)
843
            self.node.node_purge(node, until, CLUSTER_DELETED)
844
            try:
845
                props = self._get_version(node)
846
            except NameError:
847
                self.permissions.access_clear(path)
848
            self._report_size_change(user, account, -size, {'action': 'object purge'})
849
            return
850
        
851
        path, node = self._lookup_object(account, container, name)
852
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
853
        del_size = self._apply_versioning(account, container, src_version_id)
854
        if del_size:
855
            self._report_size_change(user, account, -del_size, {'action': 'object delete'})
856
        self._report_object_change(user, account, path, details={'action': 'object delete'})
857
        self.permissions.access_clear(path)
858
        
859
        if delimiter:
860
            prefix = name + delimiter if not name.endswith(delimiter) else name
861
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
862
            paths = []
863
            for t in src_names:
864
                    path = '/'.join((account, container, t[0]))
865
                    node = t[2]
866
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
867
                del_size = self._apply_versioning(account, container, src_version_id)
868
                if del_size:
869
                    self._report_size_change(user, account, -del_size, {'action': 'object delete'})
870
                self._report_object_change(user, account, path, details={'action': 'object delete'})
871
                paths.append(path)
872
            self.permissions.access_clear_bulk(paths)
873
    
874
    @backend_method
875
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
876
        """Delete/purge an object."""
877
        
878
        logger.debug("delete_object: %s %s %s %s %s %s %s", user, account, container, name, until, prefix, delimiter)
879
        self._delete_object(user, account, container, name, until, delimiter)
880
    
881
    @backend_method
882
    def list_versions(self, user, account, container, name):
883
        """Return a list of all (version, version_timestamp) tuples for an object."""
884
        
885
        logger.debug("list_versions: %s %s %s %s", user, account, container, name)
886
        self._can_read(user, account, container, name)
887
        path, node = self._lookup_object(account, container, name)
888
        versions = self.node.node_get_versions(node)
889
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
890
    
891
    @backend_method
892
    def get_uuid(self, user, uuid):
893
        """Return the (account, container, name) for the UUID given."""
894
        
895
        logger.debug("get_uuid: %s %s", user, uuid)
896
        info = self.node.latest_uuid(uuid)
897
        if info is None:
898
            raise NameError
899
        path, serial = info
900
        account, container, name = path.split('/', 2)
901
        self._can_read(user, account, container, name)
902
        return (account, container, name)
903
    
904
    @backend_method
905
    def get_public(self, user, public):
906
        """Return the (account, container, name) for the public id given."""
907
        
908
        logger.debug("get_public: %s %s", user, public)
909
        if public is None or public < ULTIMATE_ANSWER:
910
            raise NameError
911
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
912
        if path is None:
913
            raise NameError
914
        account, container, name = path.split('/', 2)
915
        self._can_read(user, account, container, name)
916
        return (account, container, name)
917
    
918
    @backend_method(autocommit=0)
919
    def get_block(self, hash):
920
        """Return a block's data."""
921
        
922
        logger.debug("get_block: %s", hash)
923
        block = self.store.block_get(binascii.unhexlify(hash))
924
        if not block:
925
            raise ItemNotExists('Block does not exist')
926
        return block
927
    
928
    @backend_method(autocommit=0)
929
    def put_block(self, data):
930
        """Store a block and return the hash."""
931
        
932
        logger.debug("put_block: %s", len(data))
933
        return binascii.hexlify(self.store.block_put(data))
934
    
935
    @backend_method(autocommit=0)
936
    def update_block(self, hash, data, offset=0):
937
        """Update a known block and return the hash."""
938
        
939
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
940
        if offset == 0 and len(data) == self.block_size:
941
            return self.put_block(data)
942
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
943
        return binascii.hexlify(h)
944
    
945
    # Path functions.
946
    
947
    def _generate_uuid(self):
948
        return str(uuidlib.uuid4())
949
    
950
    def _put_object_node(self, path, parent, name):
951
        path = '/'.join((path, name))
952
        node = self.node.node_lookup(path)
953
        if node is None:
954
            node = self.node.node_create(parent, path)
955
        return path, node
956
    
957
    def _put_path(self, user, parent, path):
958
        node = self.node.node_create(parent, path)
959
        self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
960
        return node
961
    
962
    def _lookup_account(self, account, create=True):
963
        node = self.node.node_lookup(account)
964
        if node is None and create:
965
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
966
        return account, node
967
    
968
    def _lookup_container(self, account, container):
969
        path = '/'.join((account, container))
970
        node = self.node.node_lookup(path)
971
        if node is None:
972
            raise ItemNotExists('Container does not exist')
973
        return path, node
974
    
975
    def _lookup_object(self, account, container, name):
976
        path = '/'.join((account, container, name))
977
        node = self.node.node_lookup(path)
978
        if node is None:
979
            raise ItemNotExists('Object does not exist')
980
        return path, node
981
    
982
    def _lookup_objects(self, paths):
983
        nodes = self.node.node_lookup_bulk(paths)
984
        return paths, nodes
985
    
986
    def _get_properties(self, node, until=None):
987
        """Return properties until the timestamp given."""
988
        
989
        before = until if until is not None else inf
990
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
991
        if props is None and until is not None:
992
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
993
        if props is None:
994
            raise ItemNotExists('Path does not exist')
995
        return props
996
    
997
    def _get_statistics(self, node, until=None):
998
        """Return count, sum of size and latest timestamp of everything under node."""
999
        
1000
        if until is None:
1001
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1002
        else:
1003
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1004
        if stats is None:
1005
            stats = (0, 0, 0)
1006
        return stats
1007
    
1008
    def _get_version(self, node, version=None):
1009
        if version is None:
1010
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1011
            if props is None:
1012
                raise ItemNotExists('Object does not exist')
1013
        else:
1014
            try:
1015
                version = int(version)
1016
            except ValueError:
1017
                raise VersionNotExists('Version does not exist')
1018
            props = self.node.version_get_properties(version)
1019
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1020
                raise VersionNotExists('Version does not exist')
1021
        return props
1022

    
1023
    def _get_versions(self, nodes):
1024
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1025
    
1026
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1027
        """Create a new version of the node."""
1028
        
1029
        props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1030
        if props is not None:
1031
            src_version_id = props[self.SERIAL]
1032
            src_hash = props[self.HASH]
1033
            src_size = props[self.SIZE]
1034
            src_type = props[self.TYPE]
1035
            src_checksum = props[self.CHECKSUM]
1036
        else:
1037
            src_version_id = None
1038
            src_hash = None
1039
            src_size = 0
1040
            src_type = ''
1041
            src_checksum = ''
1042
        if size is None: # Set metadata.
1043
            hash = src_hash # This way hash can be set to None (account or container).
1044
            size = src_size
1045
        if type is None:
1046
            type = src_type
1047
        if checksum is None:
1048
            checksum = src_checksum
1049
        uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
1050
        
1051
        if src_node is None:
1052
            pre_version_id = src_version_id
1053
        else:
1054
            pre_version_id = None
1055
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1056
            if props is not None:
1057
                pre_version_id = props[self.SERIAL]
1058
        if pre_version_id is not None:
1059
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1060
        
1061
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1062
        return pre_version_id, dest_version_id
1063
    
1064
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1065
        if src_version_id is not None:
1066
            self.node.attribute_copy(src_version_id, dest_version_id)
1067
        if not replace:
1068
            self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
1069
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
1070
        else:
1071
            self.node.attribute_del(dest_version_id, domain)
1072
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
1073
    
1074
    def _put_metadata(self, user, node, domain, meta, replace=False):
1075
        """Create a new version and store metadata."""
1076
        
1077
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
1078
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
1079
        return src_version_id, dest_version_id
1080
    
1081
    def _list_limits(self, listing, marker, limit):
1082
        start = 0
1083
        if marker:
1084
            try:
1085
                start = listing.index(marker) + 1
1086
            except ValueError:
1087
                pass
1088
        if not limit or limit > 10000:
1089
            limit = 10000
1090
        return start, limit
1091
    
1092
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1093
        cont_prefix = path + '/'
1094
        prefix = cont_prefix + prefix
1095
        start = cont_prefix + marker if marker else None
1096
        before = until if until is not None else inf
1097
        filterq = keys if domain else []
1098
        sizeq = size_range
1099
        
1100
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1101
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1102
        objects.sort(key=lambda x: x[0])
1103
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1104
        return objects
1105
        
1106
    # Reporting functions.
1107
    
1108
    def _report_size_change(self, user, account, size, details={}):
1109
        logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1110
        account_node = self._lookup_account(account, True)[1]
1111
        total = self._get_statistics(account_node)[1]
1112
        details.update({'user': user, 'total': total})
1113
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1114
    
1115
    def _report_object_change(self, user, account, path, details={}):
1116
        logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1117
        details.update({'user': user})
1118
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1119
    
1120
    def _report_sharing_change(self, user, account, path, details={}):
1121
        logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
1122
        details.update({'user': user})
1123
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1124
    
1125
    # Policy functions.
1126
    
1127
    def _check_policy(self, policy):
1128
        for k in policy.keys():
1129
            if policy[k] == '':
1130
                policy[k] = self.default_policy.get(k)
1131
        for k, v in policy.iteritems():
1132
            if k == 'quota':
1133
                q = int(v) # May raise ValueError.
1134
                if q < 0:
1135
                    raise ValueError
1136
            elif k == 'versioning':
1137
                if v not in ['auto', 'none']:
1138
                    raise ValueError
1139
            else:
1140
                raise ValueError
1141
    
1142
    def _put_policy(self, node, policy, replace):
1143
        if replace:
1144
            for k, v in self.default_policy.iteritems():
1145
                if k not in policy:
1146
                    policy[k] = v
1147
        self.node.policy_set(node, policy)
1148
    
1149
    def _get_policy(self, node):
1150
        policy = self.default_policy.copy()
1151
        policy.update(self.node.policy_get(node))
1152
        return policy
1153
    
1154
    def _apply_versioning(self, account, container, version_id):
1155
        """Delete the provided version if such is the policy.
1156
           Return size of object removed.
1157
        """
1158
        
1159
        if version_id is None:
1160
            return 0
1161
        path, node = self._lookup_container(account, container)
1162
        versioning = self._get_policy(node)['versioning']
1163
        if versioning != 'auto':
1164
            hash, size = self.node.version_remove(version_id)
1165
            self.store.map_delete(hash)
1166
            return size
1167
        return 0
1168
    
1169
    # Access control functions.
1170
    
1171
    def _check_groups(self, groups):
1172
        # raise ValueError('Bad characters in groups')
1173
        pass
1174
    
1175
    def _check_permissions(self, path, permissions):
1176
        # raise ValueError('Bad characters in permissions')
1177
        pass
1178
    
1179
    def _get_formatted_paths(self, paths):
1180
        formatted = []
1181
        for p in paths:
1182
            node = self.node.node_lookup(p)
1183
            if node is not None:
1184
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1185
            if props is not None:
1186
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1187
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1188
                formatted.append((p, self.MATCH_EXACT))
1189
        return formatted
1190
    
1191
    def _get_permissions_path(self, account, container, name):
1192
        path = '/'.join((account, container, name))
1193
        permission_paths = self.permissions.access_inherit(path)
1194
        permission_paths.sort()
1195
        permission_paths.reverse()
1196
        for p in permission_paths:
1197
            if p == path:
1198
                return p
1199
            else:
1200
                if p.count('/') < 2:
1201
                    continue
1202
                node = self.node.node_lookup(p)
1203
                if node is not None:
1204
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1205
                if props is not None:
1206
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1207
                        return p
1208
        return None
1209
    
1210
    def _can_read(self, user, account, container, name):
1211
        if user == account:
1212
            return True
1213
        path = '/'.join((account, container, name))
1214
        if self.permissions.public_get(path) is not None:
1215
            return True
1216
        path = self._get_permissions_path(account, container, name)
1217
        if not path:
1218
            raise NotAllowedError
1219
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1220
            raise NotAllowedError
1221
    
1222
    def _can_write(self, user, account, container, name):
1223
        if user == account:
1224
            return True
1225
        path = '/'.join((account, container, name))
1226
        path = self._get_permissions_path(account, container, name)
1227
        if not path:
1228
            raise NotAllowedError
1229
        if not self.permissions.access_check(path, self.WRITE, user):
1230
            raise NotAllowedError
1231
    
1232
    def _allowed_accounts(self, user):
1233
        allow = set()
1234
        for path in self.permissions.access_list_paths(user):
1235
            allow.add(path.split('/', 1)[0])
1236
        return sorted(allow)
1237
    
1238
    def _allowed_containers(self, user, account):
1239
        allow = set()
1240
        for path in self.permissions.access_list_paths(user, account):
1241
            allow.add(path.split('/', 2)[1])
1242
        return sorted(allow)