Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ cf4a7a7b

History | View | Annotate | Download (56.9 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
43

    
44
# Stripped-down version of the HashMap class found in tools.
45
class HashMap(list):
46

    
47
    def __init__(self, blocksize, blockhash):
48
        super(HashMap, self).__init__()
49
        self.blocksize = blocksize
50
        self.blockhash = blockhash
51

    
52
    def _hash_raw(self, v):
53
        h = hashlib.new(self.blockhash)
54
        h.update(v)
55
        return h.digest()
56

    
57
    def hash(self):
58
        if len(self) == 0:
59
            return self._hash_raw('')
60
        if len(self) == 1:
61
            return self.__getitem__(0)
62

    
63
        h = list(self)
64
        s = 2
65
        while s < len(h):
66
            s = s * 2
67
        h += [('\x00' * len(h[0]))] * (s - len(h))
68
        while len(h) > 1:
69
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
70
        return h[0]
71

    
72
# Default modules and settings.
73
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
74
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
75
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
76
DEFAULT_BLOCK_PATH = 'data/'
77
DEFAULT_BLOCK_UMASK = 0o022
78
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
79
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
80

    
81
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
82
QUEUE_CLIENT_ID = 'pithos'
83
QUEUE_INSTANCE_ID = '1'
84

    
85
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
86

    
87
inf = float('inf')
88

    
89
ULTIMATE_ANSWER = 42
90

    
91

    
92
logger = logging.getLogger(__name__)
93

    
94

    
95
def backend_method(func=None, autocommit=1):
96
    if func is None:
97
        def fn(func):
98
            return backend_method(func, autocommit)
99
        return fn
100

    
101
    if not autocommit:
102
        return func
103
    def fn(self, *args, **kw):
104
        self.wrapper.execute()
105
        try:
106
            self.messages = []
107
            ret = func(self, *args, **kw)
108
            for m in self.messages:
109
                self.queue.send(*m)
110
            self.wrapper.commit()
111
            return ret
112
        except:
113
            self.wrapper.rollback()
114
            raise
115
    return fn
116

    
117

    
118
class ModularBackend(BaseBackend):
119
    """A modular backend.
120
    
121
    Uses modules for SQL functions and storage.
122
    """
123
    
124
    def __init__(self, db_module=None, db_connection=None,
125
                 block_module=None, block_path=None, block_umask=None,
126
                 queue_module=None, queue_connection=None):
127
        db_module = db_module or DEFAULT_DB_MODULE
128
        db_connection = db_connection or DEFAULT_DB_CONNECTION
129
        block_module = block_module or DEFAULT_BLOCK_MODULE
130
        block_path = block_path or DEFAULT_BLOCK_PATH
131
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
132
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
133
        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
134
        
135
        self.hash_algorithm = 'sha256'
136
        self.block_size = 4 * 1024 * 1024 # 4MB
137
        
138
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
139
        
140
        def load_module(m):
141
            __import__(m)
142
            return sys.modules[m]
143
        
144
        self.db_module = load_module(db_module)
145
        self.wrapper = self.db_module.DBWrapper(db_connection)
146
        params = {'wrapper': self.wrapper}
147
        self.permissions = self.db_module.Permissions(**params)
148
        for x in ['READ', 'WRITE']:
149
            setattr(self, x, getattr(self.db_module, x))
150
        self.node = self.db_module.Node(**params)
151
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
152
            setattr(self, x, getattr(self.db_module, x))
153
        
154
        self.block_module = load_module(block_module)
155
        params = {'path': block_path,
156
                  'block_size': self.block_size,
157
                  'hash_algorithm': self.hash_algorithm,
158
                  'umask': block_umask}
159
        self.store = self.block_module.Store(**params)
160

    
161
        if queue_module and queue_connection:
162
            self.queue_module = load_module(queue_module)
163
            params = {'exchange': queue_connection,
164
                      'client_id': QUEUE_CLIENT_ID}
165
            self.queue = self.queue_module.Queue(**params)
166
        else:
167
            class NoQueue:
168
                def send(self, *args):
169
                    pass
170
                
171
                def close(self):
172
                    pass
173
            
174
            self.queue = NoQueue()
175
    
176
    def close(self):
177
        self.wrapper.close()
178
        self.queue.close()
179
    
180
    @backend_method
181
    def list_accounts(self, user, marker=None, limit=10000):
182
        """Return a list of accounts the user can access."""
183
        
184
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
185
        allowed = self._allowed_accounts(user)
186
        start, limit = self._list_limits(allowed, marker, limit)
187
        return allowed[start:start + limit]
188
    
189
    @backend_method
190
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
191
        """Return a dictionary with the account metadata for the domain."""
192
        
193
        logger.debug("get_account_meta: %s %s %s %s", user, account, domain, until)
194
        path, node = self._lookup_account(account, user == account)
195
        if user != account:
196
            if until or node is None or account not in self._allowed_accounts(user):
197
                raise NotAllowedError
198
        try:
199
            props = self._get_properties(node, until)
200
            mtime = props[self.MTIME]
201
        except NameError:
202
            props = None
203
            mtime = until
204
        count, bytes, tstamp = self._get_statistics(node, until)
205
        tstamp = max(tstamp, mtime)
206
        if until is None:
207
            modified = tstamp
208
        else:
209
            modified = self._get_statistics(node)[2] # Overall last modification.
210
            modified = max(modified, mtime)
211
        
212
        if user != account:
213
            meta = {'name': account}
214
        else:
215
            meta = {}
216
            if props is not None and include_user_defined:
217
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
218
            if until is not None:
219
                meta.update({'until_timestamp': tstamp})
220
            meta.update({'name': account, 'count': count, 'bytes': bytes})
221
        meta.update({'modified': modified})
222
        return meta
223
    
224
    @backend_method
225
    def update_account_meta(self, user, account, domain, meta, replace=False):
226
        """Update the metadata associated with the account for the domain."""
227
        
228
        logger.debug("update_account_meta: %s %s %s %s %s", user, account, domain, meta, replace)
229
        if user != account:
230
            raise NotAllowedError
231
        path, node = self._lookup_account(account, True)
232
        self._put_metadata(user, node, domain, meta, replace)
233
    
234
    @backend_method
235
    def get_account_groups(self, user, account):
236
        """Return a dictionary with the user groups defined for this account."""
237
        
238
        logger.debug("get_account_groups: %s %s", user, account)
239
        if user != account:
240
            if account not in self._allowed_accounts(user):
241
                raise NotAllowedError
242
            return {}
243
        self._lookup_account(account, True)
244
        return self.permissions.group_dict(account)
245
    
246
    @backend_method
247
    def update_account_groups(self, user, account, groups, replace=False):
248
        """Update the groups associated with the account."""
249
        
250
        logger.debug("update_account_groups: %s %s %s %s", user, account, groups, replace)
251
        if user != account:
252
            raise NotAllowedError
253
        self._lookup_account(account, True)
254
        self._check_groups(groups)
255
        if replace:
256
            self.permissions.group_destroy(account)
257
        for k, v in groups.iteritems():
258
            if not replace: # If not already deleted.
259
                self.permissions.group_delete(account, k)
260
            if v:
261
                self.permissions.group_addmany(account, k, v)
262
    
263
    @backend_method
264
    def get_account_policy(self, user, account):
265
        """Return a dictionary with the account policy."""
266
        
267
        logger.debug("get_account_policy: %s %s", user, account)
268
        if user != account:
269
            if account not in self._allowed_accounts(user):
270
                raise NotAllowedError
271
            return {}
272
        path, node = self._lookup_account(account, True)
273
        return self._get_policy(node)
274
    
275
    @backend_method
276
    def update_account_policy(self, user, account, policy, replace=False):
277
        """Update the policy associated with the account."""
278
        
279
        logger.debug("update_account_policy: %s %s %s %s", user, account, policy, replace)
280
        if user != account:
281
            raise NotAllowedError
282
        path, node = self._lookup_account(account, True)
283
        self._check_policy(policy)
284
        self._put_policy(node, policy, replace)
285
    
286
    @backend_method
287
    def put_account(self, user, account, policy={}):
288
        """Create a new account with the given name."""
289
        
290
        logger.debug("put_account: %s %s %s", user, account, policy)
291
        if user != account:
292
            raise NotAllowedError
293
        node = self.node.node_lookup(account)
294
        if node is not None:
295
            raise NameError('Account already exists')
296
        if policy:
297
            self._check_policy(policy)
298
        node = self._put_path(user, self.ROOTNODE, account)
299
        self._put_policy(node, policy, True)
300
    
301
    @backend_method
302
    def delete_account(self, user, account):
303
        """Delete the account with the given name."""
304
        
305
        logger.debug("delete_account: %s %s", user, account)
306
        if user != account:
307
            raise NotAllowedError
308
        node = self.node.node_lookup(account)
309
        if node is None:
310
            return
311
        if not self.node.node_remove(node):
312
            raise IndexError('Account is not empty')
313
        self.permissions.group_destroy(account)
314
    
315
    @backend_method
316
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
317
        """Return a list of containers existing under an account."""
318
        
319
        logger.debug("list_containers: %s %s %s %s %s %s %s", user, account, marker, limit, shared, until, public)
320
        if user != account:
321
            if until or account not in self._allowed_accounts(user):
322
                raise NotAllowedError
323
            allowed = self._allowed_containers(user, account)
324
            start, limit = self._list_limits(allowed, marker, limit)
325
            return allowed[start:start + limit]
326
        if shared or public:
327
            allowed = []
328
            if shared:
329
                allowed.extend([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
330
            if public:
331
                allowed.extend([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
332
            allowed = list(set(allowed))
333
            allowed.sort()
334
            start, limit = self._list_limits(allowed, marker, limit)
335
            return allowed[start:start + limit]
336
        node = self.node.node_lookup(account)
337
        containers = [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
338
        start, limit = self._list_limits([x[0] for x in containers], marker, limit)
339
        return containers[start:start + limit]
340
    
341
    @backend_method
342
    def list_container_meta(self, user, account, container, domain, until=None):
343
        """Return a list with all the container's object meta keys for the domain."""
344
        
345
        logger.debug("list_container_meta: %s %s %s %s %s", user, account, container, domain, until)
346
        allowed = []
347
        if user != account:
348
            if until:
349
                raise NotAllowedError
350
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
351
            if not allowed:
352
                raise NotAllowedError
353
        path, node = self._lookup_container(account, container)
354
        before = until if until is not None else inf
355
        allowed = self._get_formatted_paths(allowed)
356
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
357
    
358
    @backend_method
359
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
360
        """Return a dictionary with the container metadata for the domain."""
361
        
362
        logger.debug("get_container_meta: %s %s %s %s %s", user, account, container, domain, until)
363
        if user != account:
364
            if until or container not in self._allowed_containers(user, account):
365
                raise NotAllowedError
366
        path, node = self._lookup_container(account, container)
367
        props = self._get_properties(node, until)
368
        mtime = props[self.MTIME]
369
        count, bytes, tstamp = self._get_statistics(node, until)
370
        tstamp = max(tstamp, mtime)
371
        if until is None:
372
            modified = tstamp
373
        else:
374
            modified = self._get_statistics(node)[2] # Overall last modification.
375
            modified = max(modified, mtime)
376
        
377
        if user != account:
378
            meta = {'name': container}
379
        else:
380
            meta = {}
381
            if include_user_defined:
382
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
383
            if until is not None:
384
                meta.update({'until_timestamp': tstamp})
385
            meta.update({'name': container, 'count': count, 'bytes': bytes})
386
        meta.update({'modified': modified})
387
        return meta
388
    
389
    @backend_method
390
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
391
        """Update the metadata associated with the container for the domain."""
392
        
393
        logger.debug("update_container_meta: %s %s %s %s %s %s", user, account, container, domain, meta, replace)
394
        if user != account:
395
            raise NotAllowedError
396
        path, node = self._lookup_container(account, container)
397
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
398
        if src_version_id is not None:
399
            versioning = self._get_policy(node)['versioning']
400
            if versioning != 'auto':
401
                self.node.version_remove(src_version_id)
402
    
403
    @backend_method
404
    def get_container_policy(self, user, account, container):
405
        """Return a dictionary with the container policy."""
406
        
407
        logger.debug("get_container_policy: %s %s %s", user, account, container)
408
        if user != account:
409
            if container not in self._allowed_containers(user, account):
410
                raise NotAllowedError
411
            return {}
412
        path, node = self._lookup_container(account, container)
413
        return self._get_policy(node)
414
    
415
    @backend_method
416
    def update_container_policy(self, user, account, container, policy, replace=False):
417
        """Update the policy associated with the container."""
418
        
419
        logger.debug("update_container_policy: %s %s %s %s %s", user, account, container, policy, replace)
420
        if user != account:
421
            raise NotAllowedError
422
        path, node = self._lookup_container(account, container)
423
        self._check_policy(policy)
424
        self._put_policy(node, policy, replace)
425
    
426
    @backend_method
427
    def put_container(self, user, account, container, policy={}):
428
        """Create a new container with the given name."""
429
        
430
        logger.debug("put_container: %s %s %s %s", user, account, container, policy)
431
        if user != account:
432
            raise NotAllowedError
433
        try:
434
            path, node = self._lookup_container(account, container)
435
        except NameError:
436
            pass
437
        else:
438
            raise NameError('Container already exists')
439
        if policy:
440
            self._check_policy(policy)
441
        path = '/'.join((account, container))
442
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
443
        self._put_policy(node, policy, True)
444
    
445
    @backend_method
446
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
447
        """Delete/purge the container with the given name."""
448
        
449
        logger.debug("delete_container: %s %s %s %s %s %s", user, account, container, until, prefix, delimiter)
450
        if user != account:
451
            raise NotAllowedError
452
        path, node = self._lookup_container(account, container)
453
        
454
        if until is not None:
455
            hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
456
            for h in hashes:
457
                self.store.map_delete(h)
458
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
459
            self._report_size_change(user, account, -size, {'action': 'container purge'})
460
            return
461
        
462
        if self._get_statistics(node)[0] > 0:
463
            raise IndexError('Container is not empty')
464
        hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
465
        for h in hashes:
466
            self.store.map_delete(h)
467
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
468
        self.node.node_remove(node)
469
        self._report_size_change(user, account, -size, {'action': 'container delete'})
470
    
471
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
472
        if user != account and until:
473
            raise NotAllowedError
474
        if shared and public:
475
            # get shared first
476
            shared = self._list_object_permissions(user, account, container, prefix, shared=True, public=False)
477
            objects = []
478
            if shared:
479
                path, node = self._lookup_container(account, container)
480
                shared = self._get_formatted_paths(shared)
481
                objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props)
482
            
483
            # get public
484
            objects.extend(self._list_public_object_properties(user, account, container, prefix, all_props))
485
            
486
            objects.sort(key=lambda x: x[0])
487
            start, limit = self._list_limits([x[0] for x in objects], marker, limit)
488
            return objects[start:start + limit]
489
        elif public:
490
            objects = self._list_public_object_properties(user, account, container, prefix, all_props)
491
            start, limit = self._list_limits([x[0] for x in objects], marker, limit)
492
            return objects[start:start + limit]
493
        
494
        allowed = self._list_object_permissions(user, account, container, prefix, shared, public)
495
        if shared and not allowed:
496
            return []
497
        path, node = self._lookup_container(account, container)
498
        allowed = self._get_formatted_paths(allowed)
499
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
500
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
501
        return objects[start:start + limit]
502
    
503
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
504
        public = self._list_object_permissions(user, account, container, prefix, shared=False, public=True)
505
        paths, nodes = self._lookup_objects(public)
506
        path = '/'.join((account, container))
507
        cont_prefix = path + '/'
508
        paths = [x[len(cont_prefix):] for x in paths]
509
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
510
        objects = [(path,) + props for path, props in zip(paths, props)]
511
        return objects
512
        
513
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
514
        objects = []
515
        while True:
516
            marker = objects[-1] if objects else None
517
            limit = 10000
518
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
519
            objects.extend(l)
520
            if not l or len(l) < limit:
521
                break
522
        return objects
523
    
524
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
525
        allowed = []
526
        path = '/'.join((account, container, prefix)).rstrip('/')
527
        if user != account:
528
            allowed = self.permissions.access_list_paths(user, path)
529
            if not allowed:
530
                raise NotAllowedError
531
        else:
532
            allowed = []
533
            if shared:
534
                allowed.extend(self.permissions.access_list_shared(path))
535
            if public:
536
                allowed.extend([x[0] for x in self.permissions.public_list(path)])
537
            allowed = list(set(allowed))
538
            allowed.sort()
539
            if not allowed:
540
                return []
541
        return allowed
542
    
543
    @backend_method
544
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
545
        """Return a list of object (name, version_id) tuples existing under a container."""
546
        
547
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
548
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
549
    
550
    @backend_method
551
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
552
        """Return a list of object metadata dicts existing under a container."""
553
        
554
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
555
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
556
        objects = []
557
        for p in props:
558
            if len(p) == 2:
559
                objects.append({'subdir': p[0]})
560
            else:
561
                objects.append({'name': p[0],
562
                                'bytes': p[self.SIZE + 1],
563
                                'type': p[self.TYPE + 1],
564
                                'hash': p[self.HASH + 1],
565
                                'version': p[self.SERIAL + 1],
566
                                'version_timestamp': p[self.MTIME + 1],
567
                                'modified': p[self.MTIME + 1] if until is None else None,
568
                                'modified_by': p[self.MUSER + 1],
569
                                'uuid': p[self.UUID + 1],
570
                                'checksum': p[self.CHECKSUM + 1]})
571
        return objects
572
    
573
    @backend_method
574
    def list_object_permissions(self, user, account, container, prefix=''):
575
        """Return a list of paths that enforce permissions under a container."""
576
        
577
        logger.debug("list_object_permissions: %s %s %s %s", user, account, container, prefix)
578
        return self._list_object_permissions(user, account, container, prefix, True, False)
579
    
580
    @backend_method
581
    def list_object_public(self, user, account, container, prefix=''):
582
        """Return a dict mapping paths to public ids for objects that are public under a container."""
583
        
584
        logger.debug("list_object_public: %s %s %s %s", user, account, container, prefix)
585
        public = {}
586
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
587
            public[path] = p + ULTIMATE_ANSWER
588
        return public
589
    
590
    @backend_method
591
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
592
        """Return a dictionary with the object metadata for the domain."""
593
        
594
        logger.debug("get_object_meta: %s %s %s %s %s %s", user, account, container, name, domain, version)
595
        self._can_read(user, account, container, name)
596
        path, node = self._lookup_object(account, container, name)
597
        props = self._get_version(node, version)
598
        if version is None:
599
            modified = props[self.MTIME]
600
        else:
601
            try:
602
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
603
            except NameError: # Object may be deleted.
604
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
605
                if del_props is None:
606
                    raise NameError('Object does not exist')
607
                modified = del_props[self.MTIME]
608
        
609
        meta = {}
610
        if include_user_defined:
611
            meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
612
        meta.update({'name': name,
613
                     'bytes': props[self.SIZE],
614
                     'type': props[self.TYPE],
615
                     'hash': props[self.HASH],
616
                     'version': props[self.SERIAL],
617
                     'version_timestamp': props[self.MTIME],
618
                     'modified': modified,
619
                     'modified_by': props[self.MUSER],
620
                     'uuid': props[self.UUID],
621
                     'checksum': props[self.CHECKSUM]})
622
        return meta
623
    
624
    @backend_method
625
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
626
        """Update the metadata associated with the object for the domain and return the new version."""
627
        
628
        logger.debug("update_object_meta: %s %s %s %s %s %s %s", user, account, container, name, domain, meta, replace)
629
        self._can_write(user, account, container, name)
630
        path, node = self._lookup_object(account, container, name)
631
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
632
        self._apply_versioning(account, container, src_version_id)
633
        return dest_version_id
634
    
635
    @backend_method
636
    def get_object_permissions(self, user, account, container, name):
637
        """Return the action allowed on the object, the path
638
        from which the object gets its permissions from,
639
        along with a dictionary containing the permissions."""
640
        
641
        logger.debug("get_object_permissions: %s %s %s %s", user, account, container, name)
642
        allowed = 'write'
643
        permissions_path = self._get_permissions_path(account, container, name)
644
        if user != account:
645
            if self.permissions.access_check(permissions_path, self.WRITE, user):
646
                allowed = 'write'
647
            elif self.permissions.access_check(permissions_path, self.READ, user):
648
                allowed = 'read'
649
            else:
650
                raise NotAllowedError
651
        self._lookup_object(account, container, name)
652
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
653
    
654
    @backend_method
655
    def update_object_permissions(self, user, account, container, name, permissions):
656
        """Update the permissions associated with the object."""
657
        
658
        logger.debug("update_object_permissions: %s %s %s %s %s", user, account, container, name, permissions)
659
        if user != account:
660
            raise NotAllowedError
661
        path = self._lookup_object(account, container, name)[0]
662
        self._check_permissions(path, permissions)
663
        self.permissions.access_set(path, permissions)
664
        self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
665
    
666
    @backend_method
667
    def get_object_public(self, user, account, container, name):
668
        """Return the public id of the object if applicable."""
669
        
670
        logger.debug("get_object_public: %s %s %s %s", user, account, container, name)
671
        self._can_read(user, account, container, name)
672
        path = self._lookup_object(account, container, name)[0]
673
        p = self.permissions.public_get(path)
674
        if p is not None:
675
            p += ULTIMATE_ANSWER
676
        return p
677
    
678
    @backend_method
679
    def update_object_public(self, user, account, container, name, public):
680
        """Update the public status of the object."""
681
        
682
        logger.debug("update_object_public: %s %s %s %s %s", user, account, container, name, public)
683
        self._can_write(user, account, container, name)
684
        path = self._lookup_object(account, container, name)[0]
685
        if not public:
686
            self.permissions.public_unset(path)
687
        else:
688
            self.permissions.public_set(path)
689
    
690
    @backend_method
691
    def get_object_hashmap(self, user, account, container, name, version=None):
692
        """Return the object's size and a list with partial hashes."""
693
        
694
        logger.debug("get_object_hashmap: %s %s %s %s %s", user, account, container, name, version)
695
        self._can_read(user, account, container, name)
696
        path, node = self._lookup_object(account, container, name)
697
        props = self._get_version(node, version)
698
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
699
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
700
    
701
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
702
        if permissions is not None and user != account:
703
            raise NotAllowedError
704
        self._can_write(user, account, container, name)
705
        if permissions is not None:
706
            path = '/'.join((account, container, name))
707
            self._check_permissions(path, permissions)
708
        
709
        account_path, account_node = self._lookup_account(account, True)
710
        container_path, container_node = self._lookup_container(account, container)
711
        path, node = self._put_object_node(container_path, container_node, name)
712
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
713
        
714
        # Handle meta.
715
        if src_version_id is None:
716
            src_version_id = pre_version_id
717
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
718
        
719
        # Check quota.
720
        del_size = self._apply_versioning(account, container, pre_version_id)
721
        size_delta = size - del_size
722
        if size_delta > 0:
723
            account_quota = long(self._get_policy(account_node)['quota'])
724
            container_quota = long(self._get_policy(container_node)['quota'])
725
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
726
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
727
                # This must be executed in a transaction, so the version is never created if it fails.
728
                raise QuotaError
729
        self._report_size_change(user, account, size_delta, {'action': 'object update'})
730
        
731
        if permissions is not None:
732
            self.permissions.access_set(path, permissions)
733
            self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
734
        
735
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
736
        return dest_version_id
737
    
738
    @backend_method
739
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
740
        """Create/update an object with the specified size and partial hashes."""
741
        
742
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user, account, container, name, size, type, hashmap, checksum)
743
        if size == 0: # No such thing as an empty hashmap.
744
            hashmap = [self.put_block('')]
745
        map = HashMap(self.block_size, self.hash_algorithm)
746
        map.extend([binascii.unhexlify(x) for x in hashmap])
747
        missing = self.store.block_search(map)
748
        if missing:
749
            ie = IndexError()
750
            ie.data = [binascii.hexlify(x) for x in missing]
751
            raise ie
752
        
753
        hash = map.hash()
754
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
755
        self.store.map_put(hash, map)
756
        return dest_version_id
757
    
758
    @backend_method
759
    def update_object_checksum(self, user, account, container, name, version, checksum):
760
        """Update an object's checksum."""
761
        
762
        logger.debug("update_object_checksum: %s %s %s %s %s %s", user, account, container, name, version, checksum)
763
        # Update objects with greater version and same hashmap and size (fix metadata updates).
764
        self._can_write(user, account, container, name)
765
        path, node = self._lookup_object(account, container, name)
766
        props = self._get_version(node, version)
767
        versions = self.node.node_get_versions(node)
768
        for x in versions:
769
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
770
                self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
771
    
772
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
773
        dest_version_ids = []
774
        self._can_read(user, src_account, src_container, src_name)
775
        path, node = self._lookup_object(src_account, src_container, src_name)
776
        # TODO: Will do another fetch of the properties in duplicate version...
777
        props = self._get_version(node, src_version) # Check to see if source exists.
778
        src_version_id = props[self.SERIAL]
779
        hash = props[self.HASH]
780
        size = props[self.SIZE]
781
        is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
782
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
783
        if is_move:
784
                self._delete_object(user, src_account, src_container, src_name)
785
        
786
        if delimiter:
787
            prefix = src_name + delimiter if not src_name.endswith(delimiter) else src_name
788
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
789
            paths = [elem[0] for elem in src_names]
790
            nodes = [elem[2] for elem in src_names]
791
            # TODO: Will do another fetch of the properties in duplicate version...
792
            props = self._get_versions(nodes) # Check to see if source exists.
793
            
794
            for prop, path, node in zip(props, paths, nodes):
795
                src_version_id = prop[self.SERIAL]
796
                hash = prop[self.HASH]
797
                vtype = prop[self.TYPE]
798
                dest_prefix = dest_name + delimiter if not dest_name.endswith(delimiter) else dest_name
799
                vdest_name = path.replace(prefix, dest_prefix, 1)
800
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
801
                if is_move:
802
                        self._delete_object(user, src_account, src_container, path)
803
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
804
    
805
    @backend_method
806
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
807
        """Copy an object's data and metadata."""
808
        
809
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
810
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
811
        return dest_version_id
812
    
813
    @backend_method
814
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
815
        """Move an object's data and metadata."""
816
        
817
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
818
        if user != src_account:
819
            raise NotAllowedError
820
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
821
        return dest_version_id
822
    
823
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
824
        if user != account:
825
            raise NotAllowedError
826
        
827
        if until is not None:
828
            path = '/'.join((account, container, name))
829
            node = self.node.node_lookup(path)
830
            if node is None:
831
                return
832
            hashes = []
833
            size = 0
834
            h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
835
            hashes += h
836
            size += s
837
            h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
838
            hashes += h
839
            size += s
840
            for h in hashes:
841
                self.store.map_delete(h)
842
            self.node.node_purge(node, until, CLUSTER_DELETED)
843
            try:
844
                props = self._get_version(node)
845
            except NameError:
846
                self.permissions.access_clear(path)
847
            self._report_size_change(user, account, -size, {'action': 'object purge'})
848
            return
849
        
850
        path, node = self._lookup_object(account, container, name)
851
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
852
        del_size = self._apply_versioning(account, container, src_version_id)
853
        if del_size:
854
            self._report_size_change(user, account, -del_size, {'action': 'object delete'})
855
        self._report_object_change(user, account, path, details={'action': 'object delete'})
856
        self.permissions.access_clear(path)
857
        
858
        if delimiter:
859
            prefix = name + delimiter if not name.endswith(delimiter) else name
860
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
861
            paths = []
862
            for t in src_names:
863
                    path = '/'.join((account, container, t[0]))
864
                    node = t[2]
865
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
866
                del_size = self._apply_versioning(account, container, src_version_id)
867
                if del_size:
868
                    self._report_size_change(user, account, -del_size, {'action': 'object delete'})
869
                self._report_object_change(user, account, path, details={'action': 'object delete'})
870
                paths.append(path)
871
            self.permissions.access_clear_bulk(paths)
872
    
873
    @backend_method
874
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
875
        """Delete/purge an object."""
876
        
877
        logger.debug("delete_object: %s %s %s %s %s %s %s", user, account, container, name, until, prefix, delimiter)
878
        self._delete_object(user, account, container, name, until, delimiter)
879
    
880
    @backend_method
881
    def list_versions(self, user, account, container, name):
882
        """Return a list of all (version, version_timestamp) tuples for an object."""
883
        
884
        logger.debug("list_versions: %s %s %s %s", user, account, container, name)
885
        self._can_read(user, account, container, name)
886
        path, node = self._lookup_object(account, container, name)
887
        versions = self.node.node_get_versions(node)
888
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
889
    
890
    @backend_method
891
    def get_uuid(self, user, uuid):
892
        """Return the (account, container, name) for the UUID given."""
893
        
894
        logger.debug("get_uuid: %s %s", user, uuid)
895
        info = self.node.latest_uuid(uuid)
896
        if info is None:
897
            raise NameError
898
        path, serial = info
899
        account, container, name = path.split('/', 2)
900
        self._can_read(user, account, container, name)
901
        return (account, container, name)
902
    
903
    @backend_method
904
    def get_public(self, user, public):
905
        """Return the (account, container, name) for the public id given."""
906
        
907
        logger.debug("get_public: %s %s", user, public)
908
        if public is None or public < ULTIMATE_ANSWER:
909
            raise NameError
910
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
911
        if path is None:
912
            raise NameError
913
        account, container, name = path.split('/', 2)
914
        self._can_read(user, account, container, name)
915
        return (account, container, name)
916
    
917
    @backend_method(autocommit=0)
918
    def get_block(self, hash):
919
        """Return a block's data."""
920
        
921
        logger.debug("get_block: %s", hash)
922
        block = self.store.block_get(binascii.unhexlify(hash))
923
        if not block:
924
            raise NameError('Block does not exist')
925
        return block
926
    
927
    @backend_method(autocommit=0)
928
    def put_block(self, data):
929
        """Store a block and return the hash."""
930
        
931
        logger.debug("put_block: %s", len(data))
932
        return binascii.hexlify(self.store.block_put(data))
933
    
934
    @backend_method(autocommit=0)
935
    def update_block(self, hash, data, offset=0):
936
        """Update a known block and return the hash."""
937
        
938
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
939
        if offset == 0 and len(data) == self.block_size:
940
            return self.put_block(data)
941
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
942
        return binascii.hexlify(h)
943
    
944
    # Path functions.
945
    
946
    def _generate_uuid(self):
947
        return str(uuidlib.uuid4())
948
    
949
    def _put_object_node(self, path, parent, name):
950
        path = '/'.join((path, name))
951
        node = self.node.node_lookup(path)
952
        if node is None:
953
            node = self.node.node_create(parent, path)
954
        return path, node
955
    
956
    def _put_path(self, user, parent, path):
957
        node = self.node.node_create(parent, path)
958
        self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
959
        return node
960
    
961
    def _lookup_account(self, account, create=True):
962
        node = self.node.node_lookup(account)
963
        if node is None and create:
964
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
965
        return account, node
966
    
967
    def _lookup_container(self, account, container):
968
        path = '/'.join((account, container))
969
        node = self.node.node_lookup(path)
970
        if node is None:
971
            raise NameError('Container does not exist')
972
        return path, node
973
    
974
    def _lookup_object(self, account, container, name):
975
        path = '/'.join((account, container, name))
976
        node = self.node.node_lookup(path)
977
        if node is None:
978
            raise NameError('Object does not exist')
979
        return path, node
980
    
981
    def _lookup_objects(self, paths):
982
            nodes = self.node.node_lookup_bulk(paths)
983
        if nodes is None:
984
            raise NameError('Object does not exist')
985
        return paths, nodes
986
    
987
    def _get_properties(self, node, until=None):
988
        """Return properties until the timestamp given."""
989
        
990
        before = until if until is not None else inf
991
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
992
        if props is None and until is not None:
993
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
994
        if props is None:
995
            raise NameError('Path does not exist')
996
        return props
997
    
998
    def _get_statistics(self, node, until=None):
999
        """Return count, sum of size and latest timestamp of everything under node."""
1000
        
1001
        if until is None:
1002
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1003
        else:
1004
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1005
        if stats is None:
1006
            stats = (0, 0, 0)
1007
        return stats
1008
    
1009
    def _get_version(self, node, version=None):
1010
        if version is None:
1011
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1012
            if props is None:
1013
                raise NameError('Object does not exist')
1014
        else:
1015
            try:
1016
                version = int(version)
1017
            except ValueError:
1018
                raise IndexError('Version does not exist')
1019
            props = self.node.version_get_properties(version)
1020
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1021
                raise IndexError('Version does not exist')
1022
        return props
1023

    
1024
    def _get_versions(self, nodes, version=None):
1025
        if version is None:
1026
            props = self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1027
            if not props:
1028
                raise NameError('Object does not exist')
1029
        else:
1030
            try:
1031
                version = int(version)
1032
            except ValueError:
1033
                raise IndexError('Version does not exist')
1034
            props = self.node.version_get_properties(version)
1035
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1036
                raise IndexError('Version does not exist')
1037
        return props
1038
    
1039
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1040
        """Create a new version of the node."""
1041
        
1042
        props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1043
        if props is not None:
1044
            src_version_id = props[self.SERIAL]
1045
            src_hash = props[self.HASH]
1046
            src_size = props[self.SIZE]
1047
            src_type = props[self.TYPE]
1048
            src_checksum = props[self.CHECKSUM]
1049
        else:
1050
            src_version_id = None
1051
            src_hash = None
1052
            src_size = 0
1053
            src_type = ''
1054
            src_checksum = ''
1055
        if size is None: # Set metadata.
1056
            hash = src_hash # This way hash can be set to None (account or container).
1057
            size = src_size
1058
        if type is None:
1059
            type = src_type
1060
        if checksum is None:
1061
            checksum = src_checksum
1062
        uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
1063
        
1064
        if src_node is None:
1065
            pre_version_id = src_version_id
1066
        else:
1067
            pre_version_id = None
1068
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1069
            if props is not None:
1070
                pre_version_id = props[self.SERIAL]
1071
        if pre_version_id is not None:
1072
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1073
        
1074
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1075
        return pre_version_id, dest_version_id
1076
    
1077
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1078
        if src_version_id is not None:
1079
            self.node.attribute_copy(src_version_id, dest_version_id)
1080
        if not replace:
1081
            self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
1082
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
1083
        else:
1084
            self.node.attribute_del(dest_version_id, domain)
1085
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
1086
    
1087
    def _put_metadata(self, user, node, domain, meta, replace=False):
1088
        """Create a new version and store metadata."""
1089
        
1090
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
1091
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
1092
        return src_version_id, dest_version_id
1093
    
1094
    def _list_limits(self, listing, marker, limit):
1095
        start = 0
1096
        if marker:
1097
            try:
1098
                start = listing.index(marker) + 1
1099
            except ValueError:
1100
                pass
1101
        if not limit or limit > 10000:
1102
            limit = 10000
1103
        return start, limit
1104
    
1105
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1106
        cont_prefix = path + '/'
1107
        prefix = cont_prefix + prefix
1108
        start = cont_prefix + marker if marker else None
1109
        before = until if until is not None else inf
1110
        filterq = keys if domain else []
1111
        sizeq = size_range
1112
        
1113
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1114
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1115
        objects.sort(key=lambda x: x[0])
1116
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1117
        return objects
1118
        
1119
    # Reporting functions.
1120
    
1121
    def _report_size_change(self, user, account, size, details={}):
1122
        logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1123
        account_node = self._lookup_account(account, True)[1]
1124
        total = self._get_statistics(account_node)[1]
1125
        details.update({'user': user, 'total': total})
1126
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1127
    
1128
    def _report_object_change(self, user, account, path, details={}):
1129
        logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1130
        details.update({'user': user})
1131
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1132
    
1133
    def _report_sharing_change(self, user, account, path, details={}):
1134
        logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
1135
        details.update({'user': user})
1136
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1137
    
1138
    # Policy functions.
1139
    
1140
    def _check_policy(self, policy):
1141
        for k in policy.keys():
1142
            if policy[k] == '':
1143
                policy[k] = self.default_policy.get(k)
1144
        for k, v in policy.iteritems():
1145
            if k == 'quota':
1146
                q = int(v) # May raise ValueError.
1147
                if q < 0:
1148
                    raise ValueError
1149
            elif k == 'versioning':
1150
                if v not in ['auto', 'none']:
1151
                    raise ValueError
1152
            else:
1153
                raise ValueError
1154
    
1155
    def _put_policy(self, node, policy, replace):
1156
        if replace:
1157
            for k, v in self.default_policy.iteritems():
1158
                if k not in policy:
1159
                    policy[k] = v
1160
        self.node.policy_set(node, policy)
1161
    
1162
    def _get_policy(self, node):
1163
        policy = self.default_policy.copy()
1164
        policy.update(self.node.policy_get(node))
1165
        return policy
1166
    
1167
    def _apply_versioning(self, account, container, version_id):
1168
        """Delete the provided version if such is the policy.
1169
           Return size of object removed.
1170
        """
1171
        
1172
        if version_id is None:
1173
            return 0
1174
        path, node = self._lookup_container(account, container)
1175
        versioning = self._get_policy(node)['versioning']
1176
        if versioning != 'auto':
1177
            hash, size = self.node.version_remove(version_id)
1178
            self.store.map_delete(hash)
1179
            return size
1180
        return 0
1181
    
1182
    # Access control functions.
1183
    
1184
    def _check_groups(self, groups):
1185
        # raise ValueError('Bad characters in groups')
1186
        pass
1187
    
1188
    def _check_permissions(self, path, permissions):
1189
        # raise ValueError('Bad characters in permissions')
1190
        pass
1191
    
1192
    def _get_formatted_paths(self, paths):
1193
        formatted = []
1194
        for p in paths:
1195
            node = self.node.node_lookup(p)
1196
            if node is not None:
1197
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1198
            if props is not None:
1199
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1200
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1201
                formatted.append((p, self.MATCH_EXACT))
1202
        return formatted
1203
    
1204
    def _get_permissions_path(self, account, container, name):
1205
        path = '/'.join((account, container, name))
1206
        permission_paths = self.permissions.access_inherit(path)
1207
        permission_paths.sort()
1208
        permission_paths.reverse()
1209
        for p in permission_paths:
1210
            if p == path:
1211
                return p
1212
            else:
1213
                if p.count('/') < 2:
1214
                    continue
1215
                node = self.node.node_lookup(p)
1216
                if node is not None:
1217
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1218
                if props is not None:
1219
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1220
                        return p
1221
        return None
1222
    
1223
    def _can_read(self, user, account, container, name):
1224
        if user == account:
1225
            return True
1226
        path = '/'.join((account, container, name))
1227
        if self.permissions.public_get(path) is not None:
1228
            return True
1229
        path = self._get_permissions_path(account, container, name)
1230
        if not path:
1231
            raise NotAllowedError
1232
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1233
            raise NotAllowedError
1234
    
1235
    def _can_write(self, user, account, container, name):
1236
        if user == account:
1237
            return True
1238
        path = '/'.join((account, container, name))
1239
        path = self._get_permissions_path(account, container, name)
1240
        if not path:
1241
            raise NotAllowedError
1242
        if not self.permissions.access_check(path, self.WRITE, user):
1243
            raise NotAllowedError
1244
    
1245
    def _allowed_accounts(self, user):
1246
        allow = set()
1247
        for path in self.permissions.access_list_paths(user):
1248
            allow.add(path.split('/', 1)[0])
1249
        return sorted(allow)
1250
    
1251
    def _allowed_containers(self, user, account):
1252
        allow = set()
1253
        for path in self.permissions.access_list_paths(user, account):
1254
            allow.add(path.split('/', 2)[1])
1255
        return sorted(allow)