Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 7efc9f86

History | View | Annotate | Download (56.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
43
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
44

    
45
# Stripped-down version of the HashMap class found in tools.
46
class HashMap(list):
47

    
48
    def __init__(self, blocksize, blockhash):
49
        super(HashMap, self).__init__()
50
        self.blocksize = blocksize
51
        self.blockhash = blockhash
52

    
53
    def _hash_raw(self, v):
54
        h = hashlib.new(self.blockhash)
55
        h.update(v)
56
        return h.digest()
57

    
58
    def hash(self):
59
        if len(self) == 0:
60
            return self._hash_raw('')
61
        if len(self) == 1:
62
            return self.__getitem__(0)
63

    
64
        h = list(self)
65
        s = 2
66
        while s < len(h):
67
            s = s * 2
68
        h += [('\x00' * len(h[0]))] * (s - len(h))
69
        while len(h) > 1:
70
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
71
        return h[0]
72

    
73
# Default modules and settings.
74
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
75
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
76
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
77
DEFAULT_BLOCK_PATH = 'data/'
78
DEFAULT_BLOCK_UMASK = 0o022
79
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
80
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
81

    
82
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
83
QUEUE_CLIENT_ID = 'pithos'
84
QUEUE_INSTANCE_ID = '1'
85

    
86
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
87

    
88
inf = float('inf')
89

    
90
ULTIMATE_ANSWER = 42
91

    
92

    
93
logger = logging.getLogger(__name__)
94

    
95

    
96
def backend_method(func=None, autocommit=1):
97
    if func is None:
98
        def fn(func):
99
            return backend_method(func, autocommit)
100
        return fn
101

    
102
    if not autocommit:
103
        return func
104
    def fn(self, *args, **kw):
105
        self.wrapper.execute()
106
        try:
107
            self.messages = []
108
            ret = func(self, *args, **kw)
109
            for m in self.messages:
110
                self.queue.send(*m)
111
            self.wrapper.commit()
112
            return ret
113
        except:
114
            self.wrapper.rollback()
115
            raise
116
    return fn
117

    
118

    
119
class ModularBackend(BaseBackend):
120
    """A modular backend.
121
    
122
    Uses modules for SQL functions and storage.
123
    """
124
    
125
    def __init__(self, db_module=None, db_connection=None,
126
                 block_module=None, block_path=None, block_umask=None,
127
                 queue_module=None, queue_connection=None):
128
        db_module = db_module or DEFAULT_DB_MODULE
129
        db_connection = db_connection or DEFAULT_DB_CONNECTION
130
        block_module = block_module or DEFAULT_BLOCK_MODULE
131
        block_path = block_path or DEFAULT_BLOCK_PATH
132
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
133
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
134
        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
135
        
136
        self.hash_algorithm = 'sha256'
137
        self.block_size = 4 * 1024 * 1024 # 4MB
138
        
139
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
140
        
141
        def load_module(m):
142
            __import__(m)
143
            return sys.modules[m]
144
        
145
        self.db_module = load_module(db_module)
146
        self.wrapper = self.db_module.DBWrapper(db_connection)
147
        params = {'wrapper': self.wrapper}
148
        self.permissions = self.db_module.Permissions(**params)
149
        for x in ['READ', 'WRITE']:
150
            setattr(self, x, getattr(self.db_module, x))
151
        self.node = self.db_module.Node(**params)
152
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
153
            setattr(self, x, getattr(self.db_module, x))
154
        
155
        self.block_module = load_module(block_module)
156
        params = {'path': block_path,
157
                  'block_size': self.block_size,
158
                  'hash_algorithm': self.hash_algorithm,
159
                  'umask': block_umask}
160
        self.store = self.block_module.Store(**params)
161

    
162
        if queue_module and queue_connection:
163
            self.queue_module = load_module(queue_module)
164
            params = {'exchange': queue_connection,
165
                      'client_id': QUEUE_CLIENT_ID}
166
            self.queue = self.queue_module.Queue(**params)
167
        else:
168
            class NoQueue:
169
                def send(self, *args):
170
                    pass
171
                
172
                def close(self):
173
                    pass
174
            
175
            self.queue = NoQueue()
176
    
177
    def close(self):
178
        self.wrapper.close()
179
        self.queue.close()
180
    
181
    @backend_method
182
    def list_accounts(self, user, marker=None, limit=10000):
183
        """Return a list of accounts the user can access."""
184
        
185
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
186
        allowed = self._allowed_accounts(user)
187
        start, limit = self._list_limits(allowed, marker, limit)
188
        return allowed[start:start + limit]
189
    
190
    @backend_method
191
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
192
        """Return a dictionary with the account metadata for the domain."""
193
        
194
        logger.debug("get_account_meta: %s %s %s %s", user, account, domain, until)
195
        path, node = self._lookup_account(account, user == account)
196
        if user != account:
197
            if until or node is None or account not in self._allowed_accounts(user):
198
                raise NotAllowedError
199
        try:
200
            props = self._get_properties(node, until)
201
            mtime = props[self.MTIME]
202
        except NameError:
203
            props = None
204
            mtime = until
205
        count, bytes, tstamp = self._get_statistics(node, until)
206
        tstamp = max(tstamp, mtime)
207
        if until is None:
208
            modified = tstamp
209
        else:
210
            modified = self._get_statistics(node)[2] # Overall last modification.
211
            modified = max(modified, mtime)
212
        
213
        if user != account:
214
            meta = {'name': account}
215
        else:
216
            meta = {}
217
            if props is not None and include_user_defined:
218
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
219
            if until is not None:
220
                meta.update({'until_timestamp': tstamp})
221
            meta.update({'name': account, 'count': count, 'bytes': bytes})
222
        meta.update({'modified': modified})
223
        return meta
224
    
225
    @backend_method
226
    def update_account_meta(self, user, account, domain, meta, replace=False):
227
        """Update the metadata associated with the account for the domain."""
228
        
229
        logger.debug("update_account_meta: %s %s %s %s %s", user, account, domain, meta, replace)
230
        if user != account:
231
            raise NotAllowedError
232
        path, node = self._lookup_account(account, True)
233
        self._put_metadata(user, node, domain, meta, replace)
234
    
235
    @backend_method
236
    def get_account_groups(self, user, account):
237
        """Return a dictionary with the user groups defined for this account."""
238
        
239
        logger.debug("get_account_groups: %s %s", user, account)
240
        if user != account:
241
            if account not in self._allowed_accounts(user):
242
                raise NotAllowedError
243
            return {}
244
        self._lookup_account(account, True)
245
        return self.permissions.group_dict(account)
246
    
247
    @backend_method
248
    def update_account_groups(self, user, account, groups, replace=False):
249
        """Update the groups associated with the account."""
250
        
251
        logger.debug("update_account_groups: %s %s %s %s", user, account, groups, replace)
252
        if user != account:
253
            raise NotAllowedError
254
        self._lookup_account(account, True)
255
        self._check_groups(groups)
256
        if replace:
257
            self.permissions.group_destroy(account)
258
        for k, v in groups.iteritems():
259
            if not replace: # If not already deleted.
260
                self.permissions.group_delete(account, k)
261
            if v:
262
                self.permissions.group_addmany(account, k, v)
263
    
264
    @backend_method
265
    def get_account_policy(self, user, account):
266
        """Return a dictionary with the account policy."""
267
        
268
        logger.debug("get_account_policy: %s %s", user, account)
269
        if user != account:
270
            if account not in self._allowed_accounts(user):
271
                raise NotAllowedError
272
            return {}
273
        path, node = self._lookup_account(account, True)
274
        return self._get_policy(node)
275
    
276
    @backend_method
277
    def update_account_policy(self, user, account, policy, replace=False):
278
        """Update the policy associated with the account."""
279
        
280
        logger.debug("update_account_policy: %s %s %s %s", user, account, policy, replace)
281
        if user != account:
282
            raise NotAllowedError
283
        path, node = self._lookup_account(account, True)
284
        self._check_policy(policy)
285
        self._put_policy(node, policy, replace)
286
    
287
    @backend_method
288
    def put_account(self, user, account, policy={}):
289
        """Create a new account with the given name."""
290
        
291
        logger.debug("put_account: %s %s %s", user, account, policy)
292
        if user != account:
293
            raise NotAllowedError
294
        node = self.node.node_lookup(account)
295
        if node is not None:
296
            raise AccountExists('Account already exists')
297
        if policy:
298
            self._check_policy(policy)
299
        node = self._put_path(user, self.ROOTNODE, account)
300
        self._put_policy(node, policy, True)
301
    
302
    @backend_method
303
    def delete_account(self, user, account):
304
        """Delete the account with the given name."""
305
        
306
        logger.debug("delete_account: %s %s", user, account)
307
        if user != account:
308
            raise NotAllowedError
309
        node = self.node.node_lookup(account)
310
        if node is None:
311
            return
312
        if not self.node.node_remove(node):
313
            raise AccountNotEmpty('Account is not empty')
314
        self.permissions.group_destroy(account)
315
    
316
    @backend_method
317
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
318
        """Return a list of containers existing under an account."""
319
        
320
        logger.debug("list_containers: %s %s %s %s %s %s %s", user, account, marker, limit, shared, until, public)
321
        if user != account:
322
            if until or account not in self._allowed_accounts(user):
323
                raise NotAllowedError
324
            allowed = self._allowed_containers(user, account)
325
            start, limit = self._list_limits(allowed, marker, limit)
326
            return allowed[start:start + limit]
327
        if shared or public:
328
            allowed = set()
329
            if shared:
330
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
331
            if public:
332
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
333
            allowed = sorted(allowed)
334
            start, limit = self._list_limits(allowed, marker, limit)
335
            return allowed[start:start + limit]
336
        node = self.node.node_lookup(account)
337
        containers = [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
338
        start, limit = self._list_limits([x[0] for x in containers], marker, limit)
339
        return containers[start:start + limit]
340
    
341
    @backend_method
342
    def list_container_meta(self, user, account, container, domain, until=None):
343
        """Return a list with all the container's object meta keys for the domain."""
344
        
345
        logger.debug("list_container_meta: %s %s %s %s %s", user, account, container, domain, until)
346
        allowed = []
347
        if user != account:
348
            if until:
349
                raise NotAllowedError
350
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
351
            if not allowed:
352
                raise NotAllowedError
353
        path, node = self._lookup_container(account, container)
354
        before = until if until is not None else inf
355
        allowed = self._get_formatted_paths(allowed)
356
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
357
    
358
    @backend_method
359
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
360
        """Return a dictionary with the container metadata for the domain."""
361
        
362
        logger.debug("get_container_meta: %s %s %s %s %s", user, account, container, domain, until)
363
        if user != account:
364
            if until or container not in self._allowed_containers(user, account):
365
                raise NotAllowedError
366
        path, node = self._lookup_container(account, container)
367
        props = self._get_properties(node, until)
368
        mtime = props[self.MTIME]
369
        count, bytes, tstamp = self._get_statistics(node, until)
370
        tstamp = max(tstamp, mtime)
371
        if until is None:
372
            modified = tstamp
373
        else:
374
            modified = self._get_statistics(node)[2] # Overall last modification.
375
            modified = max(modified, mtime)
376
        
377
        if user != account:
378
            meta = {'name': container}
379
        else:
380
            meta = {}
381
            if include_user_defined:
382
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
383
            if until is not None:
384
                meta.update({'until_timestamp': tstamp})
385
            meta.update({'name': container, 'count': count, 'bytes': bytes})
386
        meta.update({'modified': modified})
387
        return meta
388
    
389
    @backend_method
390
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
391
        """Update the metadata associated with the container for the domain."""
392
        
393
        logger.debug("update_container_meta: %s %s %s %s %s %s", user, account, container, domain, meta, replace)
394
        if user != account:
395
            raise NotAllowedError
396
        path, node = self._lookup_container(account, container)
397
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
398
        if src_version_id is not None:
399
            versioning = self._get_policy(node)['versioning']
400
            if versioning != 'auto':
401
                self.node.version_remove(src_version_id)
402
    
403
    @backend_method
404
    def get_container_policy(self, user, account, container):
405
        """Return a dictionary with the container policy."""
406
        
407
        logger.debug("get_container_policy: %s %s %s", user, account, container)
408
        if user != account:
409
            if container not in self._allowed_containers(user, account):
410
                raise NotAllowedError
411
            return {}
412
        path, node = self._lookup_container(account, container)
413
        return self._get_policy(node)
414
    
415
    @backend_method
416
    def update_container_policy(self, user, account, container, policy, replace=False):
417
        """Update the policy associated with the container."""
418
        
419
        logger.debug("update_container_policy: %s %s %s %s %s", user, account, container, policy, replace)
420
        if user != account:
421
            raise NotAllowedError
422
        path, node = self._lookup_container(account, container)
423
        self._check_policy(policy)
424
        self._put_policy(node, policy, replace)
425
    
426
    @backend_method
427
    def put_container(self, user, account, container, policy={}):
428
        """Create a new container with the given name."""
429
        
430
        logger.debug("put_container: %s %s %s %s", user, account, container, policy)
431
        if user != account:
432
            raise NotAllowedError
433
        try:
434
            path, node = self._lookup_container(account, container)
435
        except NameError:
436
            pass
437
        else:
438
            raise ContainerExists('Container already exists')
439
        if policy:
440
            self._check_policy(policy)
441
        path = '/'.join((account, container))
442
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
443
        self._put_policy(node, policy, True)
444
    
445
    @backend_method
446
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
447
        """Delete/purge the container with the given name."""
448
        
449
        logger.debug("delete_container: %s %s %s %s %s %s", user, account, container, until, prefix, delimiter)
450
        if user != account:
451
            raise NotAllowedError
452
        path, node = self._lookup_container(account, container)
453
        
454
        if until is not None:
455
            hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
456
            for h in hashes:
457
                self.store.map_delete(h)
458
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
459
            self._report_size_change(user, account, -size, {'action': 'container purge'})
460
            return
461
        
462
        if self._get_statistics(node)[0] > 0:
463
            raise ContainerNotEmpty('Container is not empty')
464
        hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
465
        for h in hashes:
466
            self.store.map_delete(h)
467
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
468
        self.node.node_remove(node)
469
        self._report_size_change(user, account, -size, {'action': 'container delete'})
470
    
471
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
472
        if user != account and until:
473
            raise NotAllowedError
474
        if shared and public:
475
            # get shared first
476
            shared = self._list_object_permissions(user, account, container, prefix, shared=True, public=False)
477
            objects = []
478
            if shared:
479
                path, node = self._lookup_container(account, container)
480
                shared = self._get_formatted_paths(shared)
481
                objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props)
482
            
483
            # get public
484
            objects.extend(self._list_public_object_properties(user, account, container, prefix, all_props))
485
            
486
            objects.sort(key=lambda x: x[0])
487
            start, limit = self._list_limits([x[0] for x in objects], marker, limit)
488
            return objects[start:start + limit]
489
        elif public:
490
            objects = self._list_public_object_properties(user, account, container, prefix, all_props)
491
            start, limit = self._list_limits([x[0] for x in objects], marker, limit)
492
            return objects[start:start + limit]
493
        
494
        allowed = self._list_object_permissions(user, account, container, prefix, shared, public)
495
        if shared and not allowed:
496
            return []
497
        path, node = self._lookup_container(account, container)
498
        allowed = self._get_formatted_paths(allowed)
499
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
500
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
501
        return objects[start:start + limit]
502
    
503
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
504
        public = self._list_object_permissions(user, account, container, prefix, shared=False, public=True)
505
        paths, nodes = self._lookup_objects(public)
506
        path = '/'.join((account, container))
507
        cont_prefix = path + '/'
508
        paths = [x[len(cont_prefix):] for x in paths]
509
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
510
        objects = [(path,) + props for path, props in zip(paths, props)]
511
        return objects
512
        
513
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
514
        objects = []
515
        while True:
516
            marker = objects[-1] if objects else None
517
            limit = 10000
518
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
519
            objects.extend(l)
520
            if not l or len(l) < limit:
521
                break
522
        return objects
523
    
524
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
525
        allowed = []
526
        path = '/'.join((account, container, prefix)).rstrip('/')
527
        if user != account:
528
            allowed = self.permissions.access_list_paths(user, path)
529
            if not allowed:
530
                raise NotAllowedError
531
        else:
532
            allowed = set()
533
            if shared:
534
                allowed.update(self.permissions.access_list_shared(path))
535
            if public:
536
                allowed.update([x[0] for x in self.permissions.public_list(path)])
537
            allowed = sorted(allowed)
538
            if not allowed:
539
                return []
540
        return allowed
541
    
542
    @backend_method
543
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
544
        """Return a list of object (name, version_id) tuples existing under a container."""
545
        
546
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
547
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
548
    
549
    @backend_method
550
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
551
        """Return a list of object metadata dicts existing under a container."""
552
        
553
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
554
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
555
        objects = []
556
        for p in props:
557
            if len(p) == 2:
558
                objects.append({'subdir': p[0]})
559
            else:
560
                objects.append({'name': p[0],
561
                                'bytes': p[self.SIZE + 1],
562
                                'type': p[self.TYPE + 1],
563
                                'hash': p[self.HASH + 1],
564
                                'version': p[self.SERIAL + 1],
565
                                'version_timestamp': p[self.MTIME + 1],
566
                                'modified': p[self.MTIME + 1] if until is None else None,
567
                                'modified_by': p[self.MUSER + 1],
568
                                'uuid': p[self.UUID + 1],
569
                                'checksum': p[self.CHECKSUM + 1]})
570
        return objects
571
    
572
    @backend_method
573
    def list_object_permissions(self, user, account, container, prefix=''):
574
        """Return a list of paths that enforce permissions under a container."""
575
        
576
        logger.debug("list_object_permissions: %s %s %s %s", user, account, container, prefix)
577
        return self._list_object_permissions(user, account, container, prefix, True, False)
578
    
579
    @backend_method
580
    def list_object_public(self, user, account, container, prefix=''):
581
        """Return a dict mapping paths to public ids for objects that are public under a container."""
582
        
583
        logger.debug("list_object_public: %s %s %s %s", user, account, container, prefix)
584
        public = {}
585
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
586
            public[path] = p + ULTIMATE_ANSWER
587
        return public
588
    
589
    @backend_method
590
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
591
        """Return a dictionary with the object metadata for the domain."""
592
        
593
        logger.debug("get_object_meta: %s %s %s %s %s %s", user, account, container, name, domain, version)
594
        self._can_read(user, account, container, name)
595
        path, node = self._lookup_object(account, container, name)
596
        props = self._get_version(node, version)
597
        if version is None:
598
            modified = props[self.MTIME]
599
        else:
600
            try:
601
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
602
            except NameError: # Object may be deleted.
603
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
604
                if del_props is None:
605
                    raise ItemNotExists('Object does not exist')
606
                modified = del_props[self.MTIME]
607
        
608
        meta = {}
609
        if include_user_defined:
610
            meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
611
        meta.update({'name': name,
612
                     'bytes': props[self.SIZE],
613
                     'type': props[self.TYPE],
614
                     'hash': props[self.HASH],
615
                     'version': props[self.SERIAL],
616
                     'version_timestamp': props[self.MTIME],
617
                     'modified': modified,
618
                     'modified_by': props[self.MUSER],
619
                     'uuid': props[self.UUID],
620
                     'checksum': props[self.CHECKSUM]})
621
        return meta
622
    
623
    @backend_method
624
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
625
        """Update the metadata associated with the object for the domain and return the new version."""
626
        
627
        logger.debug("update_object_meta: %s %s %s %s %s %s %s", user, account, container, name, domain, meta, replace)
628
        self._can_write(user, account, container, name)
629
        path, node = self._lookup_object(account, container, name)
630
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
631
        self._apply_versioning(account, container, src_version_id)
632
        return dest_version_id
633
    
634
    @backend_method
635
    def get_object_permissions(self, user, account, container, name):
636
        """Return the action allowed on the object, the path
637
        from which the object gets its permissions from,
638
        along with a dictionary containing the permissions."""
639
        
640
        logger.debug("get_object_permissions: %s %s %s %s", user, account, container, name)
641
        allowed = 'write'
642
        permissions_path = self._get_permissions_path(account, container, name)
643
        if user != account:
644
            if self.permissions.access_check(permissions_path, self.WRITE, user):
645
                allowed = 'write'
646
            elif self.permissions.access_check(permissions_path, self.READ, user):
647
                allowed = 'read'
648
            else:
649
                raise NotAllowedError
650
        self._lookup_object(account, container, name)
651
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
652
    
653
    @backend_method
654
    def update_object_permissions(self, user, account, container, name, permissions):
655
        """Update the permissions associated with the object."""
656
        
657
        logger.debug("update_object_permissions: %s %s %s %s %s", user, account, container, name, permissions)
658
        if user != account:
659
            raise NotAllowedError
660
        path = self._lookup_object(account, container, name)[0]
661
        self._check_permissions(path, permissions)
662
        self.permissions.access_set(path, permissions)
663
        self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
664
    
665
    @backend_method
666
    def get_object_public(self, user, account, container, name):
667
        """Return the public id of the object if applicable."""
668
        
669
        logger.debug("get_object_public: %s %s %s %s", user, account, container, name)
670
        self._can_read(user, account, container, name)
671
        path = self._lookup_object(account, container, name)[0]
672
        p = self.permissions.public_get(path)
673
        if p is not None:
674
            p += ULTIMATE_ANSWER
675
        return p
676
    
677
    @backend_method
678
    def update_object_public(self, user, account, container, name, public):
679
        """Update the public status of the object."""
680
        
681
        logger.debug("update_object_public: %s %s %s %s %s", user, account, container, name, public)
682
        self._can_write(user, account, container, name)
683
        path = self._lookup_object(account, container, name)[0]
684
        if not public:
685
            self.permissions.public_unset(path)
686
        else:
687
            self.permissions.public_set(path)
688
    
689
    @backend_method
690
    def get_object_hashmap(self, user, account, container, name, version=None):
691
        """Return the object's size and a list with partial hashes."""
692
        
693
        logger.debug("get_object_hashmap: %s %s %s %s %s", user, account, container, name, version)
694
        self._can_read(user, account, container, name)
695
        path, node = self._lookup_object(account, container, name)
696
        props = self._get_version(node, version)
697
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
698
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
699
    
700
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
701
        if permissions is not None and user != account:
702
            raise NotAllowedError
703
        self._can_write(user, account, container, name)
704
        if permissions is not None:
705
            path = '/'.join((account, container, name))
706
            self._check_permissions(path, permissions)
707
        
708
        account_path, account_node = self._lookup_account(account, True)
709
        container_path, container_node = self._lookup_container(account, container)
710
        path, node = self._put_object_node(container_path, container_node, name)
711
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
712
        
713
        # Handle meta.
714
        if src_version_id is None:
715
            src_version_id = pre_version_id
716
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
717
        
718
        # Check quota.
719
        del_size = self._apply_versioning(account, container, pre_version_id)
720
        size_delta = size - del_size
721
        if size_delta > 0:
722
            account_quota = long(self._get_policy(account_node)['quota'])
723
            container_quota = long(self._get_policy(container_node)['quota'])
724
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
725
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
726
                # This must be executed in a transaction, so the version is never created if it fails.
727
                raise QuotaError
728
        self._report_size_change(user, account, size_delta, {'action': 'object update'})
729
        
730
        if permissions is not None:
731
            self.permissions.access_set(path, permissions)
732
            self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
733
        
734
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
735
        return dest_version_id
736
    
737
    @backend_method
738
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
739
        """Create/update an object with the specified size and partial hashes."""
740
        
741
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user, account, container, name, size, type, hashmap, checksum)
742
        if size == 0: # No such thing as an empty hashmap.
743
            hashmap = [self.put_block('')]
744
        map = HashMap(self.block_size, self.hash_algorithm)
745
        map.extend([binascii.unhexlify(x) for x in hashmap])
746
        missing = self.store.block_search(map)
747
        if missing:
748
            ie = IndexError()
749
            ie.data = [binascii.hexlify(x) for x in missing]
750
            raise ie
751
        
752
        hash = map.hash()
753
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
754
        self.store.map_put(hash, map)
755
        return dest_version_id
756
    
757
    @backend_method
758
    def update_object_checksum(self, user, account, container, name, version, checksum):
759
        """Update an object's checksum."""
760
        
761
        logger.debug("update_object_checksum: %s %s %s %s %s %s", user, account, container, name, version, checksum)
762
        # Update objects with greater version and same hashmap and size (fix metadata updates).
763
        self._can_write(user, account, container, name)
764
        path, node = self._lookup_object(account, container, name)
765
        props = self._get_version(node, version)
766
        versions = self.node.node_get_versions(node)
767
        for x in versions:
768
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
769
                self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
770
    
771
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
772
        dest_version_ids = []
773
        self._can_read(user, src_account, src_container, src_name)
774
        path, node = self._lookup_object(src_account, src_container, src_name)
775
        # TODO: Will do another fetch of the properties in duplicate version...
776
        props = self._get_version(node, src_version) # Check to see if source exists.
777
        src_version_id = props[self.SERIAL]
778
        hash = props[self.HASH]
779
        size = props[self.SIZE]
780
        is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
781
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
782
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
783
                self._delete_object(user, src_account, src_container, src_name)
784
        
785
        if delimiter:
786
            prefix = src_name + delimiter if not src_name.endswith(delimiter) else src_name
787
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
788
            paths = [elem[0] for elem in src_names]
789
            nodes = [elem[2] for elem in src_names]
790
            # TODO: Will do another fetch of the properties in duplicate version...
791
            props = self._get_versions(nodes) # Check to see if source exists.
792
            
793
            for prop, path, node in zip(props, paths, nodes):
794
                src_version_id = prop[self.SERIAL]
795
                hash = prop[self.HASH]
796
                vtype = prop[self.TYPE]
797
                dest_prefix = dest_name + delimiter if not dest_name.endswith(delimiter) else dest_name
798
                vdest_name = path.replace(prefix, dest_prefix, 1)
799
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
800
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
801
                        self._delete_object(user, src_account, src_container, path)
802
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
803
    
804
    @backend_method
805
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
806
        """Copy an object's data and metadata."""
807
        
808
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
809
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
810
        return dest_version_id
811
    
812
    @backend_method
813
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
814
        """Move an object's data and metadata."""
815
        
816
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
817
        if user != src_account:
818
            raise NotAllowedError
819
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
820
        return dest_version_id
821
    
822
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
823
        if user != account:
824
            raise NotAllowedError
825
        
826
        if until is not None:
827
            path = '/'.join((account, container, name))
828
            node = self.node.node_lookup(path)
829
            if node is None:
830
                return
831
            hashes = []
832
            size = 0
833
            h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
834
            hashes += h
835
            size += s
836
            h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
837
            hashes += h
838
            size += s
839
            for h in hashes:
840
                self.store.map_delete(h)
841
            self.node.node_purge(node, until, CLUSTER_DELETED)
842
            try:
843
                props = self._get_version(node)
844
            except NameError:
845
                self.permissions.access_clear(path)
846
            self._report_size_change(user, account, -size, {'action': 'object purge'})
847
            return
848
        
849
        path, node = self._lookup_object(account, container, name)
850
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
851
        del_size = self._apply_versioning(account, container, src_version_id)
852
        if del_size:
853
            self._report_size_change(user, account, -del_size, {'action': 'object delete'})
854
        self._report_object_change(user, account, path, details={'action': 'object delete'})
855
        self.permissions.access_clear(path)
856
        
857
        if delimiter:
858
            prefix = name + delimiter if not name.endswith(delimiter) else name
859
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
860
            paths = []
861
            for t in src_names:
862
                    path = '/'.join((account, container, t[0]))
863
                    node = t[2]
864
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
865
                del_size = self._apply_versioning(account, container, src_version_id)
866
                if del_size:
867
                    self._report_size_change(user, account, -del_size, {'action': 'object delete'})
868
                self._report_object_change(user, account, path, details={'action': 'object delete'})
869
                paths.append(path)
870
            self.permissions.access_clear_bulk(paths)
871
    
872
    @backend_method
873
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
874
        """Delete/purge an object."""
875
        
876
        logger.debug("delete_object: %s %s %s %s %s %s %s", user, account, container, name, until, prefix, delimiter)
877
        self._delete_object(user, account, container, name, until, delimiter)
878
    
879
    @backend_method
880
    def list_versions(self, user, account, container, name):
881
        """Return a list of all (version, version_timestamp) tuples for an object."""
882
        
883
        logger.debug("list_versions: %s %s %s %s", user, account, container, name)
884
        self._can_read(user, account, container, name)
885
        path, node = self._lookup_object(account, container, name)
886
        versions = self.node.node_get_versions(node)
887
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
888
    
889
    @backend_method
890
    def get_uuid(self, user, uuid):
891
        """Return the (account, container, name) for the UUID given."""
892
        
893
        logger.debug("get_uuid: %s %s", user, uuid)
894
        info = self.node.latest_uuid(uuid)
895
        if info is None:
896
            raise NameError
897
        path, serial = info
898
        account, container, name = path.split('/', 2)
899
        self._can_read(user, account, container, name)
900
        return (account, container, name)
901
    
902
    @backend_method
903
    def get_public(self, user, public):
904
        """Return the (account, container, name) for the public id given."""
905
        
906
        logger.debug("get_public: %s %s", user, public)
907
        if public is None or public < ULTIMATE_ANSWER:
908
            raise NameError
909
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
910
        if path is None:
911
            raise NameError
912
        account, container, name = path.split('/', 2)
913
        self._can_read(user, account, container, name)
914
        return (account, container, name)
915
    
916
    @backend_method(autocommit=0)
917
    def get_block(self, hash):
918
        """Return a block's data."""
919
        
920
        logger.debug("get_block: %s", hash)
921
        block = self.store.block_get(binascii.unhexlify(hash))
922
        if not block:
923
            raise ItemNotExists('Block does not exist')
924
        return block
925
    
926
    @backend_method(autocommit=0)
927
    def put_block(self, data):
928
        """Store a block and return the hash."""
929
        
930
        logger.debug("put_block: %s", len(data))
931
        return binascii.hexlify(self.store.block_put(data))
932
    
933
    @backend_method(autocommit=0)
934
    def update_block(self, hash, data, offset=0):
935
        """Update a known block and return the hash."""
936
        
937
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
938
        if offset == 0 and len(data) == self.block_size:
939
            return self.put_block(data)
940
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
941
        return binascii.hexlify(h)
942
    
943
    # Path functions.
944
    
945
    def _generate_uuid(self):
946
        return str(uuidlib.uuid4())
947
    
948
    def _put_object_node(self, path, parent, name):
949
        path = '/'.join((path, name))
950
        node = self.node.node_lookup(path)
951
        if node is None:
952
            node = self.node.node_create(parent, path)
953
        return path, node
954
    
955
    def _put_path(self, user, parent, path):
956
        node = self.node.node_create(parent, path)
957
        self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
958
        return node
959
    
960
    def _lookup_account(self, account, create=True):
961
        node = self.node.node_lookup(account)
962
        if node is None and create:
963
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
964
        return account, node
965
    
966
    def _lookup_container(self, account, container):
967
        path = '/'.join((account, container))
968
        node = self.node.node_lookup(path)
969
        if node is None:
970
            raise ItemNotExists('Container does not exist')
971
        return path, node
972
    
973
    def _lookup_object(self, account, container, name):
974
        path = '/'.join((account, container, name))
975
        node = self.node.node_lookup(path)
976
        if node is None:
977
            raise ItemNotExists('Object does not exist')
978
        return path, node
979
    
980
    def _lookup_objects(self, paths):
981
        nodes = self.node.node_lookup_bulk(paths)
982
        return paths, nodes
983
    
984
    def _get_properties(self, node, until=None):
985
        """Return properties until the timestamp given."""
986
        
987
        before = until if until is not None else inf
988
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
989
        if props is None and until is not None:
990
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
991
        if props is None:
992
            raise ItemNotExists('Path does not exist')
993
        return props
994
    
995
    def _get_statistics(self, node, until=None):
996
        """Return count, sum of size and latest timestamp of everything under node."""
997
        
998
        if until is None:
999
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1000
        else:
1001
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1002
        if stats is None:
1003
            stats = (0, 0, 0)
1004
        return stats
1005
    
1006
    def _get_version(self, node, version=None):
1007
        if version is None:
1008
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1009
            if props is None:
1010
                raise ItemNotExists('Object does not exist')
1011
        else:
1012
            try:
1013
                version = int(version)
1014
            except ValueError:
1015
                raise VersionNotExists('Version does not exist')
1016
            props = self.node.version_get_properties(version)
1017
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1018
                raise VersionNotExists('Version does not exist')
1019
        return props
1020

    
1021
    def _get_versions(self, nodes):
1022
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1023
    
1024
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1025
        """Create a new version of the node."""
1026
        
1027
        props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1028
        if props is not None:
1029
            src_version_id = props[self.SERIAL]
1030
            src_hash = props[self.HASH]
1031
            src_size = props[self.SIZE]
1032
            src_type = props[self.TYPE]
1033
            src_checksum = props[self.CHECKSUM]
1034
        else:
1035
            src_version_id = None
1036
            src_hash = None
1037
            src_size = 0
1038
            src_type = ''
1039
            src_checksum = ''
1040
        if size is None: # Set metadata.
1041
            hash = src_hash # This way hash can be set to None (account or container).
1042
            size = src_size
1043
        if type is None:
1044
            type = src_type
1045
        if checksum is None:
1046
            checksum = src_checksum
1047
        uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
1048
        
1049
        if src_node is None:
1050
            pre_version_id = src_version_id
1051
        else:
1052
            pre_version_id = None
1053
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1054
            if props is not None:
1055
                pre_version_id = props[self.SERIAL]
1056
        if pre_version_id is not None:
1057
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1058
        
1059
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1060
        return pre_version_id, dest_version_id
1061
    
1062
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1063
        if src_version_id is not None:
1064
            self.node.attribute_copy(src_version_id, dest_version_id)
1065
        if not replace:
1066
            self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
1067
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
1068
        else:
1069
            self.node.attribute_del(dest_version_id, domain)
1070
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
1071
    
1072
    def _put_metadata(self, user, node, domain, meta, replace=False):
1073
        """Create a new version and store metadata."""
1074
        
1075
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
1076
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
1077
        return src_version_id, dest_version_id
1078
    
1079
    def _list_limits(self, listing, marker, limit):
1080
        start = 0
1081
        if marker:
1082
            try:
1083
                start = listing.index(marker) + 1
1084
            except ValueError:
1085
                pass
1086
        if not limit or limit > 10000:
1087
            limit = 10000
1088
        return start, limit
1089
    
1090
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1091
        cont_prefix = path + '/'
1092
        prefix = cont_prefix + prefix
1093
        start = cont_prefix + marker if marker else None
1094
        before = until if until is not None else inf
1095
        filterq = keys if domain else []
1096
        sizeq = size_range
1097
        
1098
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1099
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1100
        objects.sort(key=lambda x: x[0])
1101
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1102
        return objects
1103
        
1104
    # Reporting functions.
1105
    
1106
    def _report_size_change(self, user, account, size, details={}):
1107
        logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1108
        account_node = self._lookup_account(account, True)[1]
1109
        total = self._get_statistics(account_node)[1]
1110
        details.update({'user': user, 'total': total})
1111
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1112
    
1113
    def _report_object_change(self, user, account, path, details={}):
1114
        logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1115
        details.update({'user': user})
1116
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1117
    
1118
    def _report_sharing_change(self, user, account, path, details={}):
1119
        logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
1120
        details.update({'user': user})
1121
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1122
    
1123
    # Policy functions.
1124
    
1125
    def _check_policy(self, policy):
1126
        for k in policy.keys():
1127
            if policy[k] == '':
1128
                policy[k] = self.default_policy.get(k)
1129
        for k, v in policy.iteritems():
1130
            if k == 'quota':
1131
                q = int(v) # May raise ValueError.
1132
                if q < 0:
1133
                    raise ValueError
1134
            elif k == 'versioning':
1135
                if v not in ['auto', 'none']:
1136
                    raise ValueError
1137
            else:
1138
                raise ValueError
1139
    
1140
    def _put_policy(self, node, policy, replace):
1141
        if replace:
1142
            for k, v in self.default_policy.iteritems():
1143
                if k not in policy:
1144
                    policy[k] = v
1145
        self.node.policy_set(node, policy)
1146
    
1147
    def _get_policy(self, node):
1148
        policy = self.default_policy.copy()
1149
        policy.update(self.node.policy_get(node))
1150
        return policy
1151
    
1152
    def _apply_versioning(self, account, container, version_id):
1153
        """Delete the provided version if such is the policy.
1154
           Return size of object removed.
1155
        """
1156
        
1157
        if version_id is None:
1158
            return 0
1159
        path, node = self._lookup_container(account, container)
1160
        versioning = self._get_policy(node)['versioning']
1161
        if versioning != 'auto':
1162
            hash, size = self.node.version_remove(version_id)
1163
            self.store.map_delete(hash)
1164
            return size
1165
        return 0
1166
    
1167
    # Access control functions.
1168
    
1169
    def _check_groups(self, groups):
1170
        # raise ValueError('Bad characters in groups')
1171
        pass
1172
    
1173
    def _check_permissions(self, path, permissions):
1174
        # raise ValueError('Bad characters in permissions')
1175
        pass
1176
    
1177
    def _get_formatted_paths(self, paths):
1178
        formatted = []
1179
        for p in paths:
1180
            node = self.node.node_lookup(p)
1181
            if node is not None:
1182
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1183
            if props is not None:
1184
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1185
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1186
                formatted.append((p, self.MATCH_EXACT))
1187
        return formatted
1188
    
1189
    def _get_permissions_path(self, account, container, name):
1190
        path = '/'.join((account, container, name))
1191
        permission_paths = self.permissions.access_inherit(path)
1192
        permission_paths.sort()
1193
        permission_paths.reverse()
1194
        for p in permission_paths:
1195
            if p == path:
1196
                return p
1197
            else:
1198
                if p.count('/') < 2:
1199
                    continue
1200
                node = self.node.node_lookup(p)
1201
                if node is not None:
1202
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1203
                if props is not None:
1204
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1205
                        return p
1206
        return None
1207
    
1208
    def _can_read(self, user, account, container, name):
1209
        if user == account:
1210
            return True
1211
        path = '/'.join((account, container, name))
1212
        if self.permissions.public_get(path) is not None:
1213
            return True
1214
        path = self._get_permissions_path(account, container, name)
1215
        if not path:
1216
            raise NotAllowedError
1217
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1218
            raise NotAllowedError
1219
    
1220
    def _can_write(self, user, account, container, name):
1221
        if user == account:
1222
            return True
1223
        path = '/'.join((account, container, name))
1224
        path = self._get_permissions_path(account, container, name)
1225
        if not path:
1226
            raise NotAllowedError
1227
        if not self.permissions.access_check(path, self.WRITE, user):
1228
            raise NotAllowedError
1229
    
1230
    def _allowed_accounts(self, user):
1231
        allow = set()
1232
        for path in self.permissions.access_list_paths(user):
1233
            allow.add(path.split('/', 1)[0])
1234
        return sorted(allow)
1235
    
1236
    def _allowed_containers(self, user, account):
1237
        allow = set()
1238
        for path in self.permissions.access_list_paths(user, account):
1239
            allow.add(path.split('/', 2)[1])
1240
        return sorted(allow)