Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ a8b82467

History | View | Annotate | Download (57.9 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
43
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
44

    
45
# Stripped-down version of the HashMap class found in tools.
46
class HashMap(list):
47

    
48
    def __init__(self, blocksize, blockhash):
49
        super(HashMap, self).__init__()
50
        self.blocksize = blocksize
51
        self.blockhash = blockhash
52

    
53
    def _hash_raw(self, v):
54
        h = hashlib.new(self.blockhash)
55
        h.update(v)
56
        return h.digest()
57

    
58
    def hash(self):
59
        if len(self) == 0:
60
            return self._hash_raw('')
61
        if len(self) == 1:
62
            return self.__getitem__(0)
63

    
64
        h = list(self)
65
        s = 2
66
        while s < len(h):
67
            s = s * 2
68
        h += [('\x00' * len(h[0]))] * (s - len(h))
69
        while len(h) > 1:
70
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
71
        return h[0]
72

    
73
# Default modules and settings.
74
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
75
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
76
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
77
DEFAULT_BLOCK_PATH = 'data/'
78
DEFAULT_BLOCK_UMASK = 0o022
79
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
80
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
81

    
82
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
83
QUEUE_CLIENT_ID = 'pithos'
84
QUEUE_INSTANCE_ID = '1'
85

    
86
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
87

    
88
inf = float('inf')
89

    
90
ULTIMATE_ANSWER = 42
91

    
92

    
93
logger = logging.getLogger(__name__)
94

    
95

    
96
def backend_method(func=None, autocommit=1):
97
    if func is None:
98
        def fn(func):
99
            return backend_method(func, autocommit)
100
        return fn
101

    
102
    if not autocommit:
103
        return func
104
    def fn(self, *args, **kw):
105
        self.wrapper.execute()
106
        try:
107
            self.messages = []
108
            ret = func(self, *args, **kw)
109
            for m in self.messages:
110
                self.queue.send(*m)
111
            self.wrapper.commit()
112
            return ret
113
        except:
114
            self.wrapper.rollback()
115
            raise
116
    return fn
117

    
118

    
119
class ModularBackend(BaseBackend):
120
    """A modular backend.
121
    
122
    Uses modules for SQL functions and storage.
123
    """
124
    
125
    def __init__(self, db_module=None, db_connection=None,
126
                 block_module=None, block_path=None, block_umask=None,
127
                 queue_module=None, queue_connection=None):
128
        db_module = db_module or DEFAULT_DB_MODULE
129
        db_connection = db_connection or DEFAULT_DB_CONNECTION
130
        block_module = block_module or DEFAULT_BLOCK_MODULE
131
        block_path = block_path or DEFAULT_BLOCK_PATH
132
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
133
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
134
        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
135
        
136
        self.hash_algorithm = 'sha256'
137
        self.block_size = 4 * 1024 * 1024 # 4MB
138
        
139
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
140
        
141
        def load_module(m):
142
            __import__(m)
143
            return sys.modules[m]
144
        
145
        self.db_module = load_module(db_module)
146
        self.wrapper = self.db_module.DBWrapper(db_connection)
147
        params = {'wrapper': self.wrapper}
148
        self.permissions = self.db_module.Permissions(**params)
149
        for x in ['READ', 'WRITE']:
150
            setattr(self, x, getattr(self.db_module, x))
151
        self.node = self.db_module.Node(**params)
152
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
153
            setattr(self, x, getattr(self.db_module, x))
154
        
155
        self.block_module = load_module(block_module)
156
        params = {'path': block_path,
157
                  'block_size': self.block_size,
158
                  'hash_algorithm': self.hash_algorithm,
159
                  'umask': block_umask}
160
        self.store = self.block_module.Store(**params)
161

    
162
        if queue_module and queue_connection:
163
            self.queue_module = load_module(queue_module)
164
            params = {'exchange': queue_connection,
165
                      'client_id': QUEUE_CLIENT_ID}
166
            self.queue = self.queue_module.Queue(**params)
167
        else:
168
            class NoQueue:
169
                def send(self, *args):
170
                    pass
171
                
172
                def close(self):
173
                    pass
174
            
175
            self.queue = NoQueue()
176
    
177
    def close(self):
178
        self.wrapper.close()
179
        self.queue.close()
180
    
181
    @backend_method
182
    def list_accounts(self, user, marker=None, limit=10000):
183
        """Return a list of accounts the user can access."""
184
        
185
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
186
        allowed = self._allowed_accounts(user)
187
        start, limit = self._list_limits(allowed, marker, limit)
188
        return allowed[start:start + limit]
189
    
190
    @backend_method
191
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
192
        """Return a dictionary with the account metadata for the domain."""
193
        
194
        logger.debug("get_account_meta: %s %s %s %s", user, account, domain, until)
195
        path, node = self._lookup_account(account, user == account)
196
        if user != account:
197
            if until or node is None or account not in self._allowed_accounts(user):
198
                raise NotAllowedError
199
        try:
200
            props = self._get_properties(node, until)
201
            mtime = props[self.MTIME]
202
        except NameError:
203
            props = None
204
            mtime = until
205
        count, bytes, tstamp = self._get_statistics(node, until)
206
        tstamp = max(tstamp, mtime)
207
        if until is None:
208
            modified = tstamp
209
        else:
210
            modified = self._get_statistics(node)[2] # Overall last modification.
211
            modified = max(modified, mtime)
212
        
213
        if user != account:
214
            meta = {'name': account}
215
        else:
216
            meta = {}
217
            if props is not None and include_user_defined:
218
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
219
            if until is not None:
220
                meta.update({'until_timestamp': tstamp})
221
            meta.update({'name': account, 'count': count, 'bytes': bytes})
222
        meta.update({'modified': modified})
223
        return meta
224
    
225
    @backend_method
226
    def update_account_meta(self, user, account, domain, meta, replace=False):
227
        """Update the metadata associated with the account for the domain."""
228
        
229
        logger.debug("update_account_meta: %s %s %s %s %s", user, account, domain, meta, replace)
230
        if user != account:
231
            raise NotAllowedError
232
        path, node = self._lookup_account(account, True)
233
        self._put_metadata(user, node, domain, meta, replace)
234
    
235
    @backend_method
236
    def get_account_groups(self, user, account):
237
        """Return a dictionary with the user groups defined for this account."""
238
        
239
        logger.debug("get_account_groups: %s %s", user, account)
240
        if user != account:
241
            if account not in self._allowed_accounts(user):
242
                raise NotAllowedError
243
            return {}
244
        self._lookup_account(account, True)
245
        return self.permissions.group_dict(account)
246
    
247
    @backend_method
248
    def update_account_groups(self, user, account, groups, replace=False):
249
        """Update the groups associated with the account."""
250
        
251
        logger.debug("update_account_groups: %s %s %s %s", user, account, groups, replace)
252
        if user != account:
253
            raise NotAllowedError
254
        self._lookup_account(account, True)
255
        self._check_groups(groups)
256
        if replace:
257
            self.permissions.group_destroy(account)
258
        for k, v in groups.iteritems():
259
            if not replace: # If not already deleted.
260
                self.permissions.group_delete(account, k)
261
            if v:
262
                self.permissions.group_addmany(account, k, v)
263
    
264
    @backend_method
265
    def get_account_policy(self, user, account):
266
        """Return a dictionary with the account policy."""
267
        
268
        logger.debug("get_account_policy: %s %s", user, account)
269
        if user != account:
270
            if account not in self._allowed_accounts(user):
271
                raise NotAllowedError
272
            return {}
273
        path, node = self._lookup_account(account, True)
274
        return self._get_policy(node)
275
    
276
    @backend_method
277
    def update_account_policy(self, user, account, policy, replace=False):
278
        """Update the policy associated with the account."""
279
        
280
        logger.debug("update_account_policy: %s %s %s %s", user, account, policy, replace)
281
        if user != account:
282
            raise NotAllowedError
283
        path, node = self._lookup_account(account, True)
284
        self._check_policy(policy)
285
        self._put_policy(node, policy, replace)
286
    
287
    @backend_method
288
    def put_account(self, user, account, policy={}):
289
        """Create a new account with the given name."""
290
        
291
        logger.debug("put_account: %s %s %s", user, account, policy)
292
        if user != account:
293
            raise NotAllowedError
294
        node = self.node.node_lookup(account)
295
        if node is not None:
296
            raise AccountExists('Account already exists')
297
        if policy:
298
            self._check_policy(policy)
299
        node = self._put_path(user, self.ROOTNODE, account)
300
        self._put_policy(node, policy, True)
301
    
302
    @backend_method
303
    def delete_account(self, user, account):
304
        """Delete the account with the given name."""
305
        
306
        logger.debug("delete_account: %s %s", user, account)
307
        if user != account:
308
            raise NotAllowedError
309
        node = self.node.node_lookup(account)
310
        if node is None:
311
            return
312
        if not self.node.node_remove(node):
313
            raise AccountNotEmpty('Account is not empty')
314
        self.permissions.group_destroy(account)
315
    
316
    @backend_method
317
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
318
        """Return a list of containers existing under an account."""
319
        
320
        logger.debug("list_containers: %s %s %s %s %s %s %s", user, account, marker, limit, shared, until, public)
321
        if user != account:
322
            if until or account not in self._allowed_accounts(user):
323
                raise NotAllowedError
324
            allowed = self._allowed_containers(user, account)
325
            start, limit = self._list_limits(allowed, marker, limit)
326
            return allowed[start:start + limit]
327
        if shared or public:
328
            allowed = set()
329
            if shared:
330
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
331
            if public:
332
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
333
            allowed = sorted(allowed)
334
            start, limit = self._list_limits(allowed, marker, limit)
335
            return allowed[start:start + limit]
336
        node = self.node.node_lookup(account)
337
        containers = [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
338
        start, limit = self._list_limits([x[0] for x in containers], marker, limit)
339
        return containers[start:start + limit]
340
    
341
    @backend_method
342
    def list_container_meta(self, user, account, container, domain, until=None):
343
        """Return a list with all the container's object meta keys for the domain."""
344
        
345
        logger.debug("list_container_meta: %s %s %s %s %s", user, account, container, domain, until)
346
        allowed = []
347
        if user != account:
348
            if until:
349
                raise NotAllowedError
350
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
351
            if not allowed:
352
                raise NotAllowedError
353
        path, node = self._lookup_container(account, container)
354
        before = until if until is not None else inf
355
        allowed = self._get_formatted_paths(allowed)
356
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
357
    
358
    @backend_method
359
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
360
        """Return a dictionary with the container metadata for the domain."""
361
        
362
        logger.debug("get_container_meta: %s %s %s %s %s", user, account, container, domain, until)
363
        if user != account:
364
            if until or container not in self._allowed_containers(user, account):
365
                raise NotAllowedError
366
        path, node = self._lookup_container(account, container)
367
        props = self._get_properties(node, until)
368
        mtime = props[self.MTIME]
369
        count, bytes, tstamp = self._get_statistics(node, until)
370
        tstamp = max(tstamp, mtime)
371
        if until is None:
372
            modified = tstamp
373
        else:
374
            modified = self._get_statistics(node)[2] # Overall last modification.
375
            modified = max(modified, mtime)
376
        
377
        if user != account:
378
            meta = {'name': container}
379
        else:
380
            meta = {}
381
            if include_user_defined:
382
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
383
            if until is not None:
384
                meta.update({'until_timestamp': tstamp})
385
            meta.update({'name': container, 'count': count, 'bytes': bytes})
386
        meta.update({'modified': modified})
387
        return meta
388
    
389
    @backend_method
390
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
391
        """Update the metadata associated with the container for the domain."""
392
        
393
        logger.debug("update_container_meta: %s %s %s %s %s %s", user, account, container, domain, meta, replace)
394
        if user != account:
395
            raise NotAllowedError
396
        path, node = self._lookup_container(account, container)
397
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
398
        if src_version_id is not None:
399
            versioning = self._get_policy(node)['versioning']
400
            if versioning != 'auto':
401
                self.node.version_remove(src_version_id)
402
    
403
    @backend_method
404
    def get_container_policy(self, user, account, container):
405
        """Return a dictionary with the container policy."""
406
        
407
        logger.debug("get_container_policy: %s %s %s", user, account, container)
408
        if user != account:
409
            if container not in self._allowed_containers(user, account):
410
                raise NotAllowedError
411
            return {}
412
        path, node = self._lookup_container(account, container)
413
        return self._get_policy(node)
414
    
415
    @backend_method
416
    def update_container_policy(self, user, account, container, policy, replace=False):
417
        """Update the policy associated with the container."""
418
        
419
        logger.debug("update_container_policy: %s %s %s %s %s", user, account, container, policy, replace)
420
        if user != account:
421
            raise NotAllowedError
422
        path, node = self._lookup_container(account, container)
423
        self._check_policy(policy)
424
        self._put_policy(node, policy, replace)
425
    
426
    @backend_method
427
    def put_container(self, user, account, container, policy={}):
428
        """Create a new container with the given name."""
429
        
430
        logger.debug("put_container: %s %s %s %s", user, account, container, policy)
431
        if user != account:
432
            raise NotAllowedError
433
        try:
434
            path, node = self._lookup_container(account, container)
435
        except NameError:
436
            pass
437
        else:
438
            raise ContainerExists('Container already exists')
439
        if policy:
440
            self._check_policy(policy)
441
        path = '/'.join((account, container))
442
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
443
        self._put_policy(node, policy, True)
444
    
445
    @backend_method
446
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
447
        """Delete/purge the container with the given name."""
448
        
449
        logger.debug("delete_container: %s %s %s %s %s %s", user, account, container, until, prefix, delimiter)
450
        if user != account:
451
            raise NotAllowedError
452
        path, node = self._lookup_container(account, container)
453
        
454
        if until is not None:
455
            hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
456
            for h in hashes:
457
                self.store.map_delete(h)
458
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
459
            self._report_size_change(user, account, -size, {'action': 'container purge', 'path':path})
460
            return
461
        
462
        if not delimiter:
463
            if self._get_statistics(node)[0] > 0:
464
                raise ContainerNotEmpty('Container is not empty')
465
            hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
466
            for h in hashes:
467
                self.store.map_delete(h)
468
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
469
            self.node.node_remove(node)
470
            self._report_size_change(user, account, -size, {'action': 'container delete', 'path':path})
471
        else:
472
                # remove only contents
473
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
474
            paths = []
475
            for t in src_names:
476
                path = '/'.join((account, container, t[0]))
477
                node = t[2]
478
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
479
                del_size = self._apply_versioning(account, container, src_version_id)
480
                if del_size:
481
                    self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path':path})
482
                self._report_object_change(user, account, path, details={'action': 'object delete'})
483
                paths.append(path)
484
            self.permissions.access_clear_bulk(paths)
485
    
486
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
487
        if user != account and until:
488
            raise NotAllowedError
489
        if shared and public:
490
            # get shared first
491
            shared = self._list_object_permissions(user, account, container, prefix, shared=True, public=False)
492
            objects = set()
493
            if shared:
494
                path, node = self._lookup_container(account, container)
495
                shared = self._get_formatted_paths(shared)
496
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
497
            
498
            # get public
499
            objects |= set(self._list_public_object_properties(user, account, container, prefix, all_props))
500
            objects = list(objects)
501
            
502
            objects.sort(key=lambda x: x[0])
503
            start, limit = self._list_limits([x[0] for x in objects], marker, limit)
504
            return objects[start:start + limit]
505
        elif public:
506
            objects = self._list_public_object_properties(user, account, container, prefix, all_props)
507
            start, limit = self._list_limits([x[0] for x in objects], marker, limit)
508
            return objects[start:start + limit]
509
        
510
        allowed = self._list_object_permissions(user, account, container, prefix, shared, public)
511
        if shared and not allowed:
512
            return []
513
        path, node = self._lookup_container(account, container)
514
        allowed = self._get_formatted_paths(allowed)
515
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
516
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
517
        return objects[start:start + limit]
518
    
519
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
520
        public = self._list_object_permissions(user, account, container, prefix, shared=False, public=True)
521
        paths, nodes = self._lookup_objects(public)
522
        path = '/'.join((account, container))
523
        cont_prefix = path + '/'
524
        paths = [x[len(cont_prefix):] for x in paths]
525
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
526
        objects = [(path,) + props for path, props in zip(paths, props)]
527
        return objects
528
        
529
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
530
        objects = []
531
        while True:
532
            marker = objects[-1] if objects else None
533
            limit = 10000
534
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
535
            objects.extend(l)
536
            if not l or len(l) < limit:
537
                break
538
        return objects
539
    
540
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
541
        allowed = []
542
        path = '/'.join((account, container, prefix)).rstrip('/')
543
        if user != account:
544
            allowed = self.permissions.access_list_paths(user, path)
545
            if not allowed:
546
                raise NotAllowedError
547
        else:
548
            allowed = set()
549
            if shared:
550
                allowed.update(self.permissions.access_list_shared(path))
551
            if public:
552
                allowed.update([x[0] for x in self.permissions.public_list(path)])
553
            allowed = sorted(allowed)
554
            if not allowed:
555
                return []
556
        return allowed
557
    
558
    @backend_method
559
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
560
        """Return a list of object (name, version_id) tuples existing under a container."""
561
        
562
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
563
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
564
    
565
    @backend_method
566
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
567
        """Return a list of object metadata dicts existing under a container."""
568
        
569
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
570
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
571
        objects = []
572
        for p in props:
573
            if len(p) == 2:
574
                objects.append({'subdir': p[0]})
575
            else:
576
                objects.append({'name': p[0],
577
                                'bytes': p[self.SIZE + 1],
578
                                'type': p[self.TYPE + 1],
579
                                'hash': p[self.HASH + 1],
580
                                'version': p[self.SERIAL + 1],
581
                                'version_timestamp': p[self.MTIME + 1],
582
                                'modified': p[self.MTIME + 1] if until is None else None,
583
                                'modified_by': p[self.MUSER + 1],
584
                                'uuid': p[self.UUID + 1],
585
                                'checksum': p[self.CHECKSUM + 1]})
586
        return objects
587
    
588
    @backend_method
589
    def list_object_permissions(self, user, account, container, prefix=''):
590
        """Return a list of paths that enforce permissions under a container."""
591
        
592
        logger.debug("list_object_permissions: %s %s %s %s", user, account, container, prefix)
593
        return self._list_object_permissions(user, account, container, prefix, True, False)
594
    
595
    @backend_method
596
    def list_object_public(self, user, account, container, prefix=''):
597
        """Return a dict mapping paths to public ids for objects that are public under a container."""
598
        
599
        logger.debug("list_object_public: %s %s %s %s", user, account, container, prefix)
600
        public = {}
601
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
602
            public[path] = p + ULTIMATE_ANSWER
603
        return public
604
    
605
    @backend_method
606
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
607
        """Return a dictionary with the object metadata for the domain."""
608
        
609
        logger.debug("get_object_meta: %s %s %s %s %s %s", user, account, container, name, domain, version)
610
        self._can_read(user, account, container, name)
611
        path, node = self._lookup_object(account, container, name)
612
        props = self._get_version(node, version)
613
        if version is None:
614
            modified = props[self.MTIME]
615
        else:
616
            try:
617
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
618
            except NameError: # Object may be deleted.
619
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
620
                if del_props is None:
621
                    raise ItemNotExists('Object does not exist')
622
                modified = del_props[self.MTIME]
623
        
624
        meta = {}
625
        if include_user_defined:
626
            meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
627
        meta.update({'name': name,
628
                     'bytes': props[self.SIZE],
629
                     'type': props[self.TYPE],
630
                     'hash': props[self.HASH],
631
                     'version': props[self.SERIAL],
632
                     'version_timestamp': props[self.MTIME],
633
                     'modified': modified,
634
                     'modified_by': props[self.MUSER],
635
                     'uuid': props[self.UUID],
636
                     'checksum': props[self.CHECKSUM]})
637
        return meta
638
    
639
    @backend_method
640
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
641
        """Update the metadata associated with the object for the domain and return the new version."""
642
        
643
        logger.debug("update_object_meta: %s %s %s %s %s %s %s", user, account, container, name, domain, meta, replace)
644
        self._can_write(user, account, container, name)
645
        path, node = self._lookup_object(account, container, name)
646
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
647
        self._apply_versioning(account, container, src_version_id)
648
        return dest_version_id
649
    
650
    @backend_method
651
    def get_object_permissions(self, user, account, container, name):
652
        """Return the action allowed on the object, the path
653
        from which the object gets its permissions from,
654
        along with a dictionary containing the permissions."""
655
        
656
        logger.debug("get_object_permissions: %s %s %s %s", user, account, container, name)
657
        allowed = 'write'
658
        permissions_path = self._get_permissions_path(account, container, name)
659
        if user != account:
660
            if self.permissions.access_check(permissions_path, self.WRITE, user):
661
                allowed = 'write'
662
            elif self.permissions.access_check(permissions_path, self.READ, user):
663
                allowed = 'read'
664
            else:
665
                raise NotAllowedError
666
        self._lookup_object(account, container, name)
667
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
668
    
669
    @backend_method
670
    def update_object_permissions(self, user, account, container, name, permissions):
671
        """Update the permissions associated with the object."""
672
        
673
        logger.debug("update_object_permissions: %s %s %s %s %s", user, account, container, name, permissions)
674
        if user != account:
675
            raise NotAllowedError
676
        path = self._lookup_object(account, container, name)[0]
677
        self._check_permissions(path, permissions)
678
        self.permissions.access_set(path, permissions)
679
        self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
680
    
681
    @backend_method
682
    def get_object_public(self, user, account, container, name):
683
        """Return the public id of the object if applicable."""
684
        
685
        logger.debug("get_object_public: %s %s %s %s", user, account, container, name)
686
        self._can_read(user, account, container, name)
687
        path = self._lookup_object(account, container, name)[0]
688
        p = self.permissions.public_get(path)
689
        if p is not None:
690
            p += ULTIMATE_ANSWER
691
        return p
692
    
693
    @backend_method
694
    def update_object_public(self, user, account, container, name, public):
695
        """Update the public status of the object."""
696
        
697
        logger.debug("update_object_public: %s %s %s %s %s", user, account, container, name, public)
698
        self._can_write(user, account, container, name)
699
        path = self._lookup_object(account, container, name)[0]
700
        if not public:
701
            self.permissions.public_unset(path)
702
        else:
703
            self.permissions.public_set(path)
704
    
705
    @backend_method
706
    def get_object_hashmap(self, user, account, container, name, version=None):
707
        """Return the object's size and a list with partial hashes."""
708
        
709
        logger.debug("get_object_hashmap: %s %s %s %s %s", user, account, container, name, version)
710
        self._can_read(user, account, container, name)
711
        path, node = self._lookup_object(account, container, name)
712
        props = self._get_version(node, version)
713
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
714
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
715
    
716
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
717
        if permissions is not None and user != account:
718
            raise NotAllowedError
719
        self._can_write(user, account, container, name)
720
        if permissions is not None:
721
            path = '/'.join((account, container, name))
722
            self._check_permissions(path, permissions)
723
        
724
        account_path, account_node = self._lookup_account(account, True)
725
        container_path, container_node = self._lookup_container(account, container)
726
        path, node = self._put_object_node(container_path, container_node, name)
727
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
728
        
729
        # Handle meta.
730
        if src_version_id is None:
731
            src_version_id = pre_version_id
732
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
733
        
734
        # Check quota.
735
        del_size = self._apply_versioning(account, container, pre_version_id)
736
        size_delta = size - del_size
737
        if size_delta > 0:
738
            account_quota = long(self._get_policy(account_node)['quota'])
739
            container_quota = long(self._get_policy(container_node)['quota'])
740
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
741
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
742
                # This must be executed in a transaction, so the version is never created if it fails.
743
                raise QuotaError
744
        self._report_size_change(user, account, size_delta, {'action': 'object update', 'path':path})
745
        
746
        if permissions is not None:
747
            self.permissions.access_set(path, permissions)
748
            self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
749
        
750
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
751
        return dest_version_id
752
    
753
    @backend_method
754
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
755
        """Create/update an object with the specified size and partial hashes."""
756
        
757
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user, account, container, name, size, type, hashmap, checksum)
758
        if size == 0: # No such thing as an empty hashmap.
759
            hashmap = [self.put_block('')]
760
        map = HashMap(self.block_size, self.hash_algorithm)
761
        map.extend([binascii.unhexlify(x) for x in hashmap])
762
        missing = self.store.block_search(map)
763
        if missing:
764
            ie = IndexError()
765
            ie.data = [binascii.hexlify(x) for x in missing]
766
            raise ie
767
        
768
        hash = map.hash()
769
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
770
        self.store.map_put(hash, map)
771
        return dest_version_id
772
    
773
    @backend_method
774
    def update_object_checksum(self, user, account, container, name, version, checksum):
775
        """Update an object's checksum."""
776
        
777
        logger.debug("update_object_checksum: %s %s %s %s %s %s", user, account, container, name, version, checksum)
778
        # Update objects with greater version and same hashmap and size (fix metadata updates).
779
        self._can_write(user, account, container, name)
780
        path, node = self._lookup_object(account, container, name)
781
        props = self._get_version(node, version)
782
        versions = self.node.node_get_versions(node)
783
        for x in versions:
784
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
785
                self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
786
    
787
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
788
        dest_version_ids = []
789
        self._can_read(user, src_account, src_container, src_name)
790
        path, node = self._lookup_object(src_account, src_container, src_name)
791
        # TODO: Will do another fetch of the properties in duplicate version...
792
        props = self._get_version(node, src_version) # Check to see if source exists.
793
        src_version_id = props[self.SERIAL]
794
        hash = props[self.HASH]
795
        size = props[self.SIZE]
796
        is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
797
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
798
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
799
                self._delete_object(user, src_account, src_container, src_name)
800
        
801
        if delimiter:
802
            prefix = src_name + delimiter if not src_name.endswith(delimiter) else src_name
803
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
804
            src_names.sort(key=lambda x: x[2]) # order by nodes
805
            paths = [elem[0] for elem in src_names]
806
            nodes = [elem[2] for elem in src_names]
807
            # TODO: Will do another fetch of the properties in duplicate version...
808
            props = self._get_versions(nodes) # Check to see if source exists.
809
            
810
            for prop, path, node in zip(props, paths, nodes):
811
                src_version_id = prop[self.SERIAL]
812
                hash = prop[self.HASH]
813
                vtype = prop[self.TYPE]
814
                size = prop[self.SIZE]
815
                dest_prefix = dest_name + delimiter if not dest_name.endswith(delimiter) else dest_name
816
                vdest_name = path.replace(prefix, dest_prefix, 1)
817
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
818
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
819
                        self._delete_object(user, src_account, src_container, path)
820
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
821
    
822
    @backend_method
823
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
824
        """Copy an object's data and metadata."""
825
        
826
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
827
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
828
        return dest_version_id
829
    
830
    @backend_method
831
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
832
        """Move an object's data and metadata."""
833
        
834
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
835
        if user != src_account:
836
            raise NotAllowedError
837
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
838
        return dest_version_id
839
    
840
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
841
        if user != account:
842
            raise NotAllowedError
843
        
844
        if until is not None:
845
            path = '/'.join((account, container, name))
846
            node = self.node.node_lookup(path)
847
            if node is None:
848
                return
849
            hashes = []
850
            size = 0
851
            h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
852
            hashes += h
853
            size += s
854
            h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
855
            hashes += h
856
            size += s
857
            for h in hashes:
858
                self.store.map_delete(h)
859
            self.node.node_purge(node, until, CLUSTER_DELETED)
860
            try:
861
                props = self._get_version(node)
862
            except NameError:
863
                self.permissions.access_clear(path)
864
            self._report_size_change(user, account, -size, {'action': 'object purge', 'path':path})
865
            return
866
        
867
        path, node = self._lookup_object(account, container, name)
868
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
869
        del_size = self._apply_versioning(account, container, src_version_id)
870
        if del_size:
871
            self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path':path})
872
        self._report_object_change(user, account, path, details={'action': 'object delete'})
873
        self.permissions.access_clear(path)
874
        
875
        if delimiter:
876
            prefix = name + delimiter if not name.endswith(delimiter) else name
877
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
878
            paths = []
879
            for t in src_names:
880
                    path = '/'.join((account, container, t[0]))
881
                    node = t[2]
882
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
883
                del_size = self._apply_versioning(account, container, src_version_id)
884
                if del_size:
885
                    self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path':path})
886
                self._report_object_change(user, account, path, details={'action': 'object delete'})
887
                paths.append(path)
888
            self.permissions.access_clear_bulk(paths)
889
    
890
    @backend_method
891
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
892
        """Delete/purge an object."""
893
        
894
        logger.debug("delete_object: %s %s %s %s %s %s %s", user, account, container, name, until, prefix, delimiter)
895
        self._delete_object(user, account, container, name, until, delimiter)
896
    
897
    @backend_method
898
    def list_versions(self, user, account, container, name):
899
        """Return a list of all (version, version_timestamp) tuples for an object."""
900
        
901
        logger.debug("list_versions: %s %s %s %s", user, account, container, name)
902
        self._can_read(user, account, container, name)
903
        path, node = self._lookup_object(account, container, name)
904
        versions = self.node.node_get_versions(node)
905
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
906
    
907
    @backend_method
908
    def get_uuid(self, user, uuid):
909
        """Return the (account, container, name) for the UUID given."""
910
        
911
        logger.debug("get_uuid: %s %s", user, uuid)
912
        info = self.node.latest_uuid(uuid)
913
        if info is None:
914
            raise NameError
915
        path, serial = info
916
        account, container, name = path.split('/', 2)
917
        self._can_read(user, account, container, name)
918
        return (account, container, name)
919
    
920
    @backend_method
921
    def get_public(self, user, public):
922
        """Return the (account, container, name) for the public id given."""
923
        
924
        logger.debug("get_public: %s %s", user, public)
925
        if public is None or public < ULTIMATE_ANSWER:
926
            raise NameError
927
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
928
        if path is None:
929
            raise NameError
930
        account, container, name = path.split('/', 2)
931
        self._can_read(user, account, container, name)
932
        return (account, container, name)
933
    
934
    @backend_method(autocommit=0)
935
    def get_block(self, hash):
936
        """Return a block's data."""
937
        
938
        logger.debug("get_block: %s", hash)
939
        block = self.store.block_get(binascii.unhexlify(hash))
940
        if not block:
941
            raise ItemNotExists('Block does not exist')
942
        return block
943
    
944
    @backend_method(autocommit=0)
945
    def put_block(self, data):
946
        """Store a block and return the hash."""
947
        
948
        logger.debug("put_block: %s", len(data))
949
        return binascii.hexlify(self.store.block_put(data))
950
    
951
    @backend_method(autocommit=0)
952
    def update_block(self, hash, data, offset=0):
953
        """Update a known block and return the hash."""
954
        
955
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
956
        if offset == 0 and len(data) == self.block_size:
957
            return self.put_block(data)
958
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
959
        return binascii.hexlify(h)
960
    
961
    # Path functions.
962
    
963
    def _generate_uuid(self):
964
        return str(uuidlib.uuid4())
965
    
966
    def _put_object_node(self, path, parent, name):
967
        path = '/'.join((path, name))
968
        node = self.node.node_lookup(path)
969
        if node is None:
970
            node = self.node.node_create(parent, path)
971
        return path, node
972
    
973
    def _put_path(self, user, parent, path):
974
        node = self.node.node_create(parent, path)
975
        self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
976
        return node
977
    
978
    def _lookup_account(self, account, create=True):
979
        node = self.node.node_lookup(account)
980
        if node is None and create:
981
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
982
        return account, node
983
    
984
    def _lookup_container(self, account, container):
985
        path = '/'.join((account, container))
986
        node = self.node.node_lookup(path)
987
        if node is None:
988
            raise ItemNotExists('Container does not exist')
989
        return path, node
990
    
991
    def _lookup_object(self, account, container, name):
992
        path = '/'.join((account, container, name))
993
        node = self.node.node_lookup(path)
994
        if node is None:
995
            raise ItemNotExists('Object does not exist')
996
        return path, node
997
    
998
    def _lookup_objects(self, paths):
999
        nodes = self.node.node_lookup_bulk(paths)
1000
        return paths, nodes
1001
    
1002
    def _get_properties(self, node, until=None):
1003
        """Return properties until the timestamp given."""
1004
        
1005
        before = until if until is not None else inf
1006
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1007
        if props is None and until is not None:
1008
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1009
        if props is None:
1010
            raise ItemNotExists('Path does not exist')
1011
        return props
1012
    
1013
    def _get_statistics(self, node, until=None):
1014
        """Return count, sum of size and latest timestamp of everything under node."""
1015
        
1016
        if until is None:
1017
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1018
        else:
1019
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1020
        if stats is None:
1021
            stats = (0, 0, 0)
1022
        return stats
1023
    
1024
    def _get_version(self, node, version=None):
1025
        if version is None:
1026
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1027
            if props is None:
1028
                raise ItemNotExists('Object does not exist')
1029
        else:
1030
            try:
1031
                version = int(version)
1032
            except ValueError:
1033
                raise VersionNotExists('Version does not exist')
1034
            props = self.node.version_get_properties(version)
1035
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1036
                raise VersionNotExists('Version does not exist')
1037
        return props
1038

    
1039
    def _get_versions(self, nodes):
1040
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1041
    
1042
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1043
        """Create a new version of the node."""
1044
        
1045
        props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1046
        if props is not None:
1047
            src_version_id = props[self.SERIAL]
1048
            src_hash = props[self.HASH]
1049
            src_size = props[self.SIZE]
1050
            src_type = props[self.TYPE]
1051
            src_checksum = props[self.CHECKSUM]
1052
        else:
1053
            src_version_id = None
1054
            src_hash = None
1055
            src_size = 0
1056
            src_type = ''
1057
            src_checksum = ''
1058
        if size is None: # Set metadata.
1059
            hash = src_hash # This way hash can be set to None (account or container).
1060
            size = src_size
1061
        if type is None:
1062
            type = src_type
1063
        if checksum is None:
1064
            checksum = src_checksum
1065
        uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
1066
        
1067
        if src_node is None:
1068
            pre_version_id = src_version_id
1069
        else:
1070
            pre_version_id = None
1071
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1072
            if props is not None:
1073
                pre_version_id = props[self.SERIAL]
1074
        if pre_version_id is not None:
1075
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1076
        
1077
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1078
        return pre_version_id, dest_version_id
1079
    
1080
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1081
        if src_version_id is not None:
1082
            self.node.attribute_copy(src_version_id, dest_version_id)
1083
        if not replace:
1084
            self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
1085
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
1086
        else:
1087
            self.node.attribute_del(dest_version_id, domain)
1088
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
1089
    
1090
    def _put_metadata(self, user, node, domain, meta, replace=False):
1091
        """Create a new version and store metadata."""
1092
        
1093
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
1094
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
1095
        return src_version_id, dest_version_id
1096
    
1097
    def _list_limits(self, listing, marker, limit):
1098
        start = 0
1099
        if marker:
1100
            try:
1101
                start = listing.index(marker) + 1
1102
            except ValueError:
1103
                pass
1104
        if not limit or limit > 10000:
1105
            limit = 10000
1106
        return start, limit
1107
    
1108
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1109
        cont_prefix = path + '/'
1110
        prefix = cont_prefix + prefix
1111
        start = cont_prefix + marker if marker else None
1112
        before = until if until is not None else inf
1113
        filterq = keys if domain else []
1114
        sizeq = size_range
1115
        
1116
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1117
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1118
        objects.sort(key=lambda x: x[0])
1119
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1120
        return objects
1121
        
1122
    # Reporting functions.
1123
    
1124
    def _report_size_change(self, user, account, size, details={}):
1125
        account_node = self._lookup_account(account, True)[1]
1126
        total = self._get_statistics(account_node)[1]
1127
        details.update({'user': user, 'total': total})
1128
        logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1129
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1130
    
1131
    def _report_object_change(self, user, account, path, details={}):
1132
        details.update({'user': user})
1133
        logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1134
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1135
    
1136
    def _report_sharing_change(self, user, account, path, details={}):
1137
        logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
1138
        details.update({'user': user})
1139
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1140
    
1141
    # Policy functions.
1142
    
1143
    def _check_policy(self, policy):
1144
        for k in policy.keys():
1145
            if policy[k] == '':
1146
                policy[k] = self.default_policy.get(k)
1147
        for k, v in policy.iteritems():
1148
            if k == 'quota':
1149
                q = int(v) # May raise ValueError.
1150
                if q < 0:
1151
                    raise ValueError
1152
            elif k == 'versioning':
1153
                if v not in ['auto', 'none']:
1154
                    raise ValueError
1155
            else:
1156
                raise ValueError
1157
    
1158
    def _put_policy(self, node, policy, replace):
1159
        if replace:
1160
            for k, v in self.default_policy.iteritems():
1161
                if k not in policy:
1162
                    policy[k] = v
1163
        self.node.policy_set(node, policy)
1164
    
1165
    def _get_policy(self, node):
1166
        policy = self.default_policy.copy()
1167
        policy.update(self.node.policy_get(node))
1168
        return policy
1169
    
1170
    def _apply_versioning(self, account, container, version_id):
1171
        """Delete the provided version if such is the policy.
1172
           Return size of object removed.
1173
        """
1174
        
1175
        if version_id is None:
1176
            return 0
1177
        path, node = self._lookup_container(account, container)
1178
        versioning = self._get_policy(node)['versioning']
1179
        if versioning != 'auto':
1180
            hash, size = self.node.version_remove(version_id)
1181
            self.store.map_delete(hash)
1182
            return size
1183
        return 0
1184
    
1185
    # Access control functions.
1186
    
1187
    def _check_groups(self, groups):
1188
        # raise ValueError('Bad characters in groups')
1189
        pass
1190
    
1191
    def _check_permissions(self, path, permissions):
1192
        # raise ValueError('Bad characters in permissions')
1193
        pass
1194
    
1195
    def _get_formatted_paths(self, paths):
1196
        formatted = []
1197
        for p in paths:
1198
            node = self.node.node_lookup(p)
1199
            props = None
1200
            if node is not None:
1201
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1202
            if props is not None:
1203
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1204
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1205
                formatted.append((p, self.MATCH_EXACT))
1206
        return formatted
1207
    
1208
    def _get_permissions_path(self, account, container, name):
1209
        path = '/'.join((account, container, name))
1210
        permission_paths = self.permissions.access_inherit(path)
1211
        permission_paths.sort()
1212
        permission_paths.reverse()
1213
        for p in permission_paths:
1214
            if p == path:
1215
                return p
1216
            else:
1217
                if p.count('/') < 2:
1218
                    continue
1219
                node = self.node.node_lookup(p)
1220
                if node is not None:
1221
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1222
                if props is not None:
1223
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1224
                        return p
1225
        return None
1226
    
1227
    def _can_read(self, user, account, container, name):
1228
        if user == account:
1229
            return True
1230
        path = '/'.join((account, container, name))
1231
        if self.permissions.public_get(path) is not None:
1232
            return True
1233
        path = self._get_permissions_path(account, container, name)
1234
        if not path:
1235
            raise NotAllowedError
1236
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1237
            raise NotAllowedError
1238
    
1239
    def _can_write(self, user, account, container, name):
1240
        if user == account:
1241
            return True
1242
        path = '/'.join((account, container, name))
1243
        path = self._get_permissions_path(account, container, name)
1244
        if not path:
1245
            raise NotAllowedError
1246
        if not self.permissions.access_check(path, self.WRITE, user):
1247
            raise NotAllowedError
1248
    
1249
    def _allowed_accounts(self, user):
1250
        allow = set()
1251
        for path in self.permissions.access_list_paths(user):
1252
            allow.add(path.split('/', 1)[0])
1253
        return sorted(allow)
1254
    
1255
    def _allowed_containers(self, user, account):
1256
        allow = set()
1257
        for path in self.permissions.access_list_paths(user, account):
1258
            allow.add(path.split('/', 2)[1])
1259
        return sorted(allow)