Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / modular.py @ 4819d34f

History | View | Annotate | Download (37.5 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import logging
38
import binascii
39

    
40
from base import NotAllowedError, QuotaError, BaseBackend
41

    
42
from pithos.lib.hashmap import HashMap
43

    
44
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
45

    
46
inf = float('inf')
47

    
48
ULTIMATE_ANSWER = 42
49

    
50

    
51
logger = logging.getLogger(__name__)
52

    
53

    
54
def backend_method(func=None, autocommit=1):
55
    if func is None:
56
        def fn(func):
57
            return backend_method(func, autocommit)
58
        return fn
59

    
60
    if not autocommit:
61
        return func
62
    def fn(self, *args, **kw):
63
        self.wrapper.execute()
64
        try:
65
            ret = func(self, *args, **kw)
66
            self.wrapper.commit()
67
            return ret
68
        except:
69
            self.wrapper.rollback()
70
            raise
71
    return fn
72

    
73

    
74
class ModularBackend(BaseBackend):
75
    """A modular backend.
76
    
77
    Uses modules for SQL functions and storage.
78
    """
79
    
80
    def __init__(self, db_module, db_connection, block_module, block_path):
81
        db_module = db_module or 'pithos.backends.lib.sqlalchemy'
82
        block_module = block_module or 'pithos.backends.lib.hashfiler'
83
        
84
        self.hash_algorithm = 'sha256'
85
        self.block_size = 4 * 1024 * 1024 # 4MB
86
        
87
        self.default_policy = {'quota': 0, 'versioning': 'auto'}
88
        
89
        __import__(db_module)
90
        self.db_module = sys.modules[db_module]
91
        self.wrapper = self.db_module.DBWrapper(db_connection)
92
        
93
        params = {'wrapper': self.wrapper}
94
        self.permissions = self.db_module.Permissions(**params)
95
        for x in ['READ', 'WRITE']:
96
            setattr(self, x, getattr(self.db_module, x))
97
        self.node = self.db_module.Node(**params)
98
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
99
            setattr(self, x, getattr(self.db_module, x))
100
        
101
        __import__(block_module)
102
        self.block_module = sys.modules[block_module]
103
        
104
        params = {'path': block_path,
105
                  'block_size': self.block_size,
106
                  'hash_algorithm': self.hash_algorithm}
107
        self.store = self.block_module.Store(**params)
108
    
109
    def close(self):
110
        self.wrapper.close()
111
    
112
    @backend_method
113
    def list_accounts(self, user, marker=None, limit=10000):
114
        """Return a list of accounts the user can access."""
115
        
116
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
117
        allowed = self._allowed_accounts(user)
118
        start, limit = self._list_limits(allowed, marker, limit)
119
        return allowed[start:start + limit]
120
    
121
    @backend_method
122
    def get_account_meta(self, user, account, domain, until=None):
123
        """Return a dictionary with the account metadata for the domain."""
124
        
125
        logger.debug("get_account_meta: %s %s %s", account, domain, until)
126
        path, node = self._lookup_account(account, user == account)
127
        if user != account:
128
            if until or node is None or account not in self._allowed_accounts(user):
129
                raise NotAllowedError
130
        try:
131
            props = self._get_properties(node, until)
132
            mtime = props[self.MTIME]
133
        except NameError:
134
            props = None
135
            mtime = until
136
        count, bytes, tstamp = self._get_statistics(node, until)
137
        tstamp = max(tstamp, mtime)
138
        if until is None:
139
            modified = tstamp
140
        else:
141
            modified = self._get_statistics(node)[2] # Overall last modification.
142
            modified = max(modified, mtime)
143
        
144
        if user != account:
145
            meta = {'name': account}
146
        else:
147
            meta = {}
148
            if props is not None:
149
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
150
            if until is not None:
151
                meta.update({'until_timestamp': tstamp})
152
            meta.update({'name': account, 'count': count, 'bytes': bytes})
153
        meta.update({'modified': modified})
154
        return meta
155
    
156
    @backend_method
157
    def update_account_meta(self, user, account, domain, meta, replace=False):
158
        """Update the metadata associated with the account for the domain."""
159
        
160
        logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
161
        if user != account:
162
            raise NotAllowedError
163
        path, node = self._lookup_account(account, True)
164
        self._put_metadata(user, node, domain, meta, replace)
165
    
166
    @backend_method
167
    def get_account_groups(self, user, account):
168
        """Return a dictionary with the user groups defined for this account."""
169
        
170
        logger.debug("get_account_groups: %s", account)
171
        if user != account:
172
            if account not in self._allowed_accounts(user):
173
                raise NotAllowedError
174
            return {}
175
        self._lookup_account(account, True)
176
        return self.permissions.group_dict(account)
177
    
178
    @backend_method
179
    def update_account_groups(self, user, account, groups, replace=False):
180
        """Update the groups associated with the account."""
181
        
182
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
183
        if user != account:
184
            raise NotAllowedError
185
        self._lookup_account(account, True)
186
        self._check_groups(groups)
187
        if replace:
188
            self.permissions.group_destroy(account)
189
        for k, v in groups.iteritems():
190
            if not replace: # If not already deleted.
191
                self.permissions.group_delete(account, k)
192
            if v:
193
                self.permissions.group_addmany(account, k, v)
194
    
195
    @backend_method
196
    def get_account_policy(self, user, account):
197
        """Return a dictionary with the account policy."""
198
        
199
        logger.debug("get_account_policy: %s", account)
200
        if user != account:
201
            if account not in self._allowed_accounts(user):
202
                raise NotAllowedError
203
            return {}
204
        path, node = self._lookup_account(account, True)
205
        return self._get_policy(node)
206
    
207
    @backend_method
208
    def update_account_policy(self, user, account, policy, replace=False):
209
        """Update the policy associated with the account."""
210
        
211
        logger.debug("update_account_policy: %s %s %s", account, policy, replace)
212
        if user != account:
213
            raise NotAllowedError
214
        path, node = self._lookup_account(account, True)
215
        self._check_policy(policy)
216
        self._put_policy(node, policy, replace)
217
    
218
    @backend_method
219
    def put_account(self, user, account, policy={}):
220
        """Create a new account with the given name."""
221
        
222
        logger.debug("put_account: %s %s", account, policy)
223
        if user != account:
224
            raise NotAllowedError
225
        node = self.node.node_lookup(account)
226
        if node is not None:
227
            raise NameError('Account already exists')
228
        if policy:
229
            self._check_policy(policy)
230
        node = self._put_path(user, self.ROOTNODE, account)
231
        self._put_policy(node, policy, True)
232
    
233
    @backend_method
234
    def delete_account(self, user, account):
235
        """Delete the account with the given name."""
236
        
237
        logger.debug("delete_account: %s", account)
238
        if user != account:
239
            raise NotAllowedError
240
        node = self.node.node_lookup(account)
241
        if node is None:
242
            return
243
        if not self.node.node_remove(node):
244
            raise IndexError('Account is not empty')
245
        self.permissions.group_destroy(account)
246
    
247
    @backend_method
248
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
249
        """Return a list of containers existing under an account."""
250
        
251
        logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
252
        if user != account:
253
            if until or account not in self._allowed_accounts(user):
254
                raise NotAllowedError
255
            allowed = self._allowed_containers(user, account)
256
            start, limit = self._list_limits(allowed, marker, limit)
257
            return allowed[start:start + limit]
258
        if shared:
259
            allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
260
            allowed = list(set(allowed))
261
            start, limit = self._list_limits(allowed, marker, limit)
262
            return allowed[start:start + limit]
263
        node = self.node.node_lookup(account)
264
        return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, None, [], until)]
265
    
266
    @backend_method
267
    def get_container_meta(self, user, account, container, domain, until=None):
268
        """Return a dictionary with the container metadata for the domain."""
269
        
270
        logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
271
        if user != account:
272
            if until or container not in self._allowed_containers(user, account):
273
                raise NotAllowedError
274
        path, node = self._lookup_container(account, container)
275
        props = self._get_properties(node, until)
276
        mtime = props[self.MTIME]
277
        count, bytes, tstamp = self._get_statistics(node, until)
278
        tstamp = max(tstamp, mtime)
279
        if until is None:
280
            modified = tstamp
281
        else:
282
            modified = self._get_statistics(node)[2] # Overall last modification.
283
            modified = max(modified, mtime)
284
        
285
        if user != account:
286
            meta = {'name': container}
287
        else:
288
            meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
289
            if until is not None:
290
                meta.update({'until_timestamp': tstamp})
291
            meta.update({'name': container, 'count': count, 'bytes': bytes})
292
        meta.update({'modified': modified})
293
        return meta
294
    
295
    @backend_method
296
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
297
        """Update the metadata associated with the container for the domain."""
298
        
299
        logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
300
        if user != account:
301
            raise NotAllowedError
302
        path, node = self._lookup_container(account, container)
303
        self._put_metadata(user, node, domain, meta, replace)
304
    
305
    @backend_method
306
    def get_container_policy(self, user, account, container):
307
        """Return a dictionary with the container policy."""
308
        
309
        logger.debug("get_container_policy: %s %s", account, container)
310
        if user != account:
311
            if container not in self._allowed_containers(user, account):
312
                raise NotAllowedError
313
            return {}
314
        path, node = self._lookup_container(account, container)
315
        return self._get_policy(node)
316
    
317
    @backend_method
318
    def update_container_policy(self, user, account, container, policy, replace=False):
319
        """Update the policy associated with the container."""
320
        
321
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
322
        if user != account:
323
            raise NotAllowedError
324
        path, node = self._lookup_container(account, container)
325
        self._check_policy(policy)
326
        self._put_policy(node, policy, replace)
327
    
328
    @backend_method
329
    def put_container(self, user, account, container, policy={}):
330
        """Create a new container with the given name."""
331
        
332
        logger.debug("put_container: %s %s %s", account, container, policy)
333
        if user != account:
334
            raise NotAllowedError
335
        try:
336
            path, node = self._lookup_container(account, container)
337
        except NameError:
338
            pass
339
        else:
340
            raise NameError('Container already exists')
341
        if policy:
342
            self._check_policy(policy)
343
        path = '/'.join((account, container))
344
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
345
        self._put_policy(node, policy, True)
346
    
347
    @backend_method
348
    def delete_container(self, user, account, container, until=None):
349
        """Delete/purge the container with the given name."""
350
        
351
        logger.debug("delete_container: %s %s %s", account, container, until)
352
        if user != account:
353
            raise NotAllowedError
354
        path, node = self._lookup_container(account, container)
355
        
356
        if until is not None:
357
            hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
358
            for h in hashes:
359
                self.store.map_delete(h)
360
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
361
            return
362
        
363
        if self._get_statistics(node)[0] > 0:
364
            raise IndexError('Container is not empty')
365
        hashes = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
366
        for h in hashes:
367
            self.store.map_delete(h)
368
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
369
        self.node.node_remove(node)
370
    
371
    @backend_method
372
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None):
373
        """Return a list of objects existing under a container."""
374
        
375
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until)
376
        allowed = []
377
        if user != account:
378
            if until:
379
                raise NotAllowedError
380
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
381
            if not allowed:
382
                raise NotAllowedError
383
        else:
384
            if shared:
385
                allowed = self.permissions.access_list_shared('/'.join((account, container)))
386
                if not allowed:
387
                    return []
388
        path, node = self._lookup_container(account, container)
389
        return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, allowed)
390
    
391
    @backend_method
392
    def list_object_meta(self, user, account, container, domain, until=None):
393
        """Return a list with all the container's object meta keys for the domain."""
394
        
395
        logger.debug("list_object_meta: %s %s %s %s", account, container, domain, until)
396
        allowed = []
397
        if user != account:
398
            if until:
399
                raise NotAllowedError
400
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
401
            if not allowed:
402
                raise NotAllowedError
403
        path, node = self._lookup_container(account, container)
404
        before = until if until is not None else inf
405
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
406
    
407
    @backend_method
408
    def get_object_meta(self, user, account, container, name, domain, version=None):
409
        """Return a dictionary with the object metadata for the domain."""
410
        
411
        logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
412
        self._can_read(user, account, container, name)
413
        path, node = self._lookup_object(account, container, name)
414
        props = self._get_version(node, version)
415
        if version is None:
416
            modified = props[self.MTIME]
417
        else:
418
            try:
419
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
420
            except NameError: # Object may be deleted.
421
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
422
                if del_props is None:
423
                    raise NameError('Object does not exist')
424
                modified = del_props[self.MTIME]
425
        
426
        meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
427
        meta.update({'name': name, 'bytes': props[self.SIZE], 'hash':props[self.HASH]})
428
        meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
429
        meta.update({'modified': modified, 'modified_by': props[self.MUSER]})
430
        return meta
431
    
432
    @backend_method
433
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
434
        """Update the metadata associated with the object for the domain and return the new version."""
435
        
436
        logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
437
        self._can_write(user, account, container, name)
438
        path, node = self._lookup_object(account, container, name)
439
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
440
        self._apply_versioning(account, container, src_version_id)
441
        return dest_version_id
442
    
443
    @backend_method
444
    def get_object_permissions(self, user, account, container, name):
445
        """Return the action allowed on the object, the path
446
        from which the object gets its permissions from,
447
        along with a dictionary containing the permissions."""
448
        
449
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
450
        allowed = 'write'
451
        if user != account:
452
            path = '/'.join((account, container, name))
453
            if self.permissions.access_check(path, self.WRITE, user):
454
                allowed = 'write'
455
            elif self.permissions.access_check(path, self.READ, user):
456
                allowed = 'read'
457
            else:
458
                raise NotAllowedError
459
        path = self._lookup_object(account, container, name)[0]
460
        return (allowed,) + self.permissions.access_inherit(path)
461
    
462
    @backend_method
463
    def update_object_permissions(self, user, account, container, name, permissions):
464
        """Update the permissions associated with the object."""
465
        
466
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
467
        if user != account:
468
            raise NotAllowedError
469
        path = self._lookup_object(account, container, name)[0]
470
        self._check_permissions(path, permissions)
471
        self.permissions.access_set(path, permissions)
472
    
473
    @backend_method
474
    def get_object_public(self, user, account, container, name):
475
        """Return the public id of the object if applicable."""
476
        
477
        logger.debug("get_object_public: %s %s %s", account, container, name)
478
        self._can_read(user, account, container, name)
479
        path = self._lookup_object(account, container, name)[0]
480
        p = self.permissions.public_get(path)
481
        if p is not None:
482
            p += ULTIMATE_ANSWER
483
        return p
484
    
485
    @backend_method
486
    def update_object_public(self, user, account, container, name, public):
487
        """Update the public status of the object."""
488
        
489
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
490
        self._can_write(user, account, container, name)
491
        path = self._lookup_object(account, container, name)[0]
492
        if not public:
493
            self.permissions.public_unset(path)
494
        else:
495
            self.permissions.public_set(path)
496
    
497
    @backend_method
498
    def get_object_hashmap(self, user, account, container, name, version=None):
499
        """Return the object's size and a list with partial hashes."""
500
        
501
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
502
        self._can_read(user, account, container, name)
503
        path, node = self._lookup_object(account, container, name)
504
        props = self._get_version(node, version)
505
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
506
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
507
    
508
    def _update_object_hash(self, user, account, container, name, size, hash, permissions=None):
509
        if permissions is not None and user != account:
510
            raise NotAllowedError
511
        self._can_write(user, account, container, name)
512
        if permissions is not None:
513
            path = '/'.join((account, container, name))
514
            self._check_permissions(path, permissions)
515
        
516
        account_path, account_node = self._lookup_account(account, True)
517
        container_path, container_node = self._lookup_container(account, container)
518
        path, node = self._put_object_node(container_path, container_node, name)
519
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size, hash)
520
        
521
        # Check quota.
522
        size_delta = size # Change with versioning.
523
        if size_delta > 0:
524
            account_quota = long(self._get_policy(account_node)['quota'])
525
            container_quota = long(self._get_policy(container_node)['quota'])
526
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
527
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
528
                # This must be executed in a transaction, so the version is never created if it fails.
529
                raise QuotaError
530
        
531
        if permissions is not None:
532
            self.permissions.access_set(path, permissions)
533
        self._apply_versioning(account, container, src_version_id)
534
        return src_version_id, dest_version_id
535
    
536
    @backend_method
537
    def update_object_hashmap(self, user, account, container, name, size, hashmap, domain, meta={}, replace_meta=False, permissions=None):
538
        """Create/update an object with the specified size and partial hashes."""
539
        
540
        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
541
        if size == 0: # No such thing as an empty hashmap.
542
            hashmap = [self.put_block('')]
543
        map = HashMap(self.block_size, self.hash_algorithm)
544
        map.extend([binascii.unhexlify(x) for x in hashmap])
545
        missing = self.store.block_search(map)
546
        if missing:
547
            ie = IndexError()
548
            ie.data = [binascii.hexlify(x) for x in missing]
549
            raise ie
550
        
551
        hash = map.hash()
552
        src_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
553
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
554
        self.store.map_put(hash, map)
555
        return dest_version_id
556
    
557
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
558
        self._can_read(user, src_account, src_container, src_name)
559
        path, node = self._lookup_object(src_account, src_container, src_name)
560
        props = self._get_version(node, src_version)
561
        src_version_id = props[self.SERIAL]
562
        hash = props[self.HASH]
563
        size = props[self.SIZE]
564
        
565
        src_v_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, permissions)
566
        self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
567
        return dest_version_id
568
    
569
    @backend_method
570
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
571
        """Copy an object's data and metadata."""
572
        
573
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
574
        return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
575
    
576
    @backend_method
577
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None):
578
        """Move an object's data and metadata."""
579
        
580
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions)
581
        if user != src_account:
582
            raise NotAllowedError
583
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None)
584
        if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
585
            self._delete_object(user, src_account, src_container, src_name)
586
        return dest_version_id
587
    
588
    def _delete_object(self, user, account, container, name, until=None):
589
        if user != account:
590
            raise NotAllowedError
591
        
592
        if until is not None:
593
            path = '/'.join((account, container, name))
594
            node = self.node.node_lookup(path)
595
            if node is None:
596
                return
597
            hashes = self.node.node_purge(node, until, CLUSTER_NORMAL)
598
            hashes += self.node.node_purge(node, until, CLUSTER_HISTORY)
599
            for h in hashes:
600
                self.store.map_delete(h)
601
            self.node.node_purge(node, until, CLUSTER_DELETED)
602
            try:
603
                props = self._get_version(node)
604
            except NameError:
605
                self.permissions.access_clear(path)
606
            return
607
        
608
        path, node = self._lookup_object(account, container, name)
609
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, 0, None, CLUSTER_DELETED)
610
        self._apply_versioning(account, container, src_version_id)
611
        self.permissions.access_clear(path)
612
    
613
    @backend_method
614
    def delete_object(self, user, account, container, name, until=None):
615
        """Delete/purge an object."""
616
        
617
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
618
        self._delete_object(user, account, container, name, until)
619
    
620
    @backend_method
621
    def list_versions(self, user, account, container, name):
622
        """Return a list of all (version, version_timestamp) tuples for an object."""
623
        
624
        logger.debug("list_versions: %s %s %s", account, container, name)
625
        self._can_read(user, account, container, name)
626
        path, node = self._lookup_object(account, container, name)
627
        versions = self.node.node_get_versions(node)
628
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
629
    
630
    @backend_method
631
    def get_public(self, user, public):
632
        """Return the (account, container, name) for the public id given."""
633
        logger.debug("get_public: %s", public)
634
        if public is None or public < ULTIMATE_ANSWER:
635
            raise NameError
636
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
637
        account, container, name = path.split('/', 2)
638
        self._can_read(user, account, container, name)
639
        return (account, container, name)
640
    
641
    @backend_method(autocommit=0)
642
    def get_block(self, hash):
643
        """Return a block's data."""
644
        
645
        logger.debug("get_block: %s", hash)
646
        block = self.store.block_get(binascii.unhexlify(hash))
647
        if not block:
648
            raise NameError('Block does not exist')
649
        return block
650
    
651
    @backend_method(autocommit=0)
652
    def put_block(self, data):
653
        """Store a block and return the hash."""
654
        
655
        logger.debug("put_block: %s", len(data))
656
        return binascii.hexlify(self.store.block_put(data))
657
    
658
    @backend_method(autocommit=0)
659
    def update_block(self, hash, data, offset=0):
660
        """Update a known block and return the hash."""
661
        
662
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
663
        if offset == 0 and len(data) == self.block_size:
664
            return self.put_block(data)
665
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
666
        return binascii.hexlify(h)
667
    
668
    # Path functions.
669
    
670
    def _put_object_node(self, path, parent, name):
671
        path = '/'.join((path, name))
672
        node = self.node.node_lookup(path)
673
        if node is None:
674
            node = self.node.node_create(parent, path)
675
        return path, node
676
    
677
    def _put_path(self, user, parent, path):
678
        node = self.node.node_create(parent, path)
679
        self.node.version_create(node, None, 0, None, user, CLUSTER_NORMAL)
680
        return node
681
    
682
    def _lookup_account(self, account, create=True):
683
        node = self.node.node_lookup(account)
684
        if node is None and create:
685
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
686
        return account, node
687
    
688
    def _lookup_container(self, account, container):
689
        path = '/'.join((account, container))
690
        node = self.node.node_lookup(path)
691
        if node is None:
692
            raise NameError('Container does not exist')
693
        return path, node
694
    
695
    def _lookup_object(self, account, container, name):
696
        path = '/'.join((account, container, name))
697
        node = self.node.node_lookup(path)
698
        if node is None:
699
            raise NameError('Object does not exist')
700
        return path, node
701
    
702
    def _get_properties(self, node, until=None):
703
        """Return properties until the timestamp given."""
704
        
705
        before = until if until is not None else inf
706
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
707
        if props is None and until is not None:
708
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
709
        if props is None:
710
            raise NameError('Path does not exist')
711
        return props
712
    
713
    def _get_statistics(self, node, until=None):
714
        """Return count, sum of size and latest timestamp of everything under node."""
715
        
716
        if until is None:
717
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
718
        else:
719
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
720
        if stats is None:
721
            stats = (0, 0, 0)
722
        return stats
723
    
724
    def _get_version(self, node, version=None):
725
        if version is None:
726
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
727
            if props is None:
728
                raise NameError('Object does not exist')
729
        else:
730
            try:
731
                version = int(version)
732
            except ValueError:
733
                raise IndexError('Version does not exist')
734
            props = self.node.version_get_properties(version)
735
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
736
                raise IndexError('Version does not exist')
737
        return props
738
    
739
    def _put_version_duplicate(self, user, node, size=None, hash=None, cluster=CLUSTER_NORMAL):
740
        """Create a new version of the node."""
741
        
742
        props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
743
        if props is not None:
744
            src_version_id = props[self.SERIAL]
745
            src_hash = props[self.HASH]
746
            src_size = props[self.SIZE]
747
        else:
748
            src_version_id = None
749
            src_hash = None
750
            src_size = 0
751
        if size is None:
752
            hash = src_hash # This way hash can be set to None.
753
            size = src_size
754
        
755
        if src_version_id is not None:
756
            self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
757
        dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, cluster)
758
        return src_version_id, dest_version_id
759
    
760
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
761
        if src_version_id is not None:
762
            self.node.attribute_copy(src_version_id, dest_version_id)
763
        if not replace:
764
            self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
765
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
766
        else:
767
            self.node.attribute_del(dest_version_id, domain)
768
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
769
    
770
    def _put_metadata(self, user, node, domain, meta, replace=False):
771
        """Create a new version and store metadata."""
772
        
773
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
774
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
775
        return src_version_id, dest_version_id
776
    
777
    def _list_limits(self, listing, marker, limit):
778
        start = 0
779
        if marker:
780
            try:
781
                start = listing.index(marker) + 1
782
            except ValueError:
783
                pass
784
        if not limit or limit > 10000:
785
            limit = 10000
786
        return start, limit
787
    
788
    def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, allowed=[]):
789
        cont_prefix = path + '/'
790
        prefix = cont_prefix + prefix
791
        start = cont_prefix + marker if marker else None
792
        before = until if until is not None else inf
793
        filterq = keys if domain else []
794
        
795
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq)
796
        objects.extend([(p, None) for p in prefixes] if virtual else [])
797
        objects.sort(key=lambda x: x[0])
798
        objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
799
        
800
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
801
        return objects[start:start + limit]
802
    
803
    # Policy functions.
804
    
805
    def _check_policy(self, policy):
806
        for k in policy.keys():
807
            if policy[k] == '':
808
                policy[k] = self.default_policy.get(k)
809
        for k, v in policy.iteritems():
810
            if k == 'quota':
811
                q = int(v) # May raise ValueError.
812
                if q < 0:
813
                    raise ValueError
814
            elif k == 'versioning':
815
                if v not in ['auto', 'none']:
816
                    raise ValueError
817
            else:
818
                raise ValueError
819
    
820
    def _put_policy(self, node, policy, replace):
821
        if replace:
822
            for k, v in self.default_policy.iteritems():
823
                if k not in policy:
824
                    policy[k] = v
825
        self.node.policy_set(node, policy)
826
    
827
    def _get_policy(self, node):
828
        policy = self.default_policy.copy()
829
        policy.update(self.node.policy_get(node))
830
        return policy
831
    
832
    def _apply_versioning(self, account, container, version_id):
833
        if version_id is None:
834
            return
835
        path, node = self._lookup_container(account, container)
836
        versioning = self._get_policy(node)['versioning']
837
        if versioning != 'auto':
838
            hash = self.node.version_remove(version_id)
839
            self.store.map_delete(hash)
840
    
841
    # Access control functions.
842
    
843
    def _check_groups(self, groups):
844
        # raise ValueError('Bad characters in groups')
845
        pass
846
    
847
    def _check_permissions(self, path, permissions):
848
        # raise ValueError('Bad characters in permissions')
849
        
850
        # Check for existing permissions.
851
        paths = self.permissions.access_list(path)
852
        if paths:
853
            ae = AttributeError()
854
            ae.data = paths
855
            raise ae
856
    
857
    def _can_read(self, user, account, container, name):
858
        if user == account:
859
            return True
860
        path = '/'.join((account, container, name))
861
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
862
            raise NotAllowedError
863
    
864
    def _can_write(self, user, account, container, name):
865
        if user == account:
866
            return True
867
        path = '/'.join((account, container, name))
868
        if not self.permissions.access_check(path, self.WRITE, user):
869
            raise NotAllowedError
870
    
871
    def _allowed_accounts(self, user):
872
        allow = set()
873
        for path in self.permissions.access_list_paths(user):
874
            allow.add(path.split('/', 1)[0])
875
        return sorted(allow)
876
    
877
    def _allowed_containers(self, user, account):
878
        allow = set()
879
        for path in self.permissions.access_list_paths(user, account):
880
            allow.add(path.split('/', 2)[1])
881
        return sorted(allow)