Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / modular.py @ 5a96180b

History | View | Annotate | Download (37.3 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import logging
38
import binascii
39

    
40
from base import NotAllowedError, QuotaError, BaseBackend
41

    
42
from pithos.lib.hashmap import HashMap
43

    
44
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
45

    
46
inf = float('inf')
47

    
48
ULTIMATE_ANSWER = 42
49

    
50

    
51
logger = logging.getLogger(__name__)
52

    
53

    
54
def backend_method(func=None, autocommit=1):
55
    if func is None:
56
        def fn(func):
57
            return backend_method(func, autocommit)
58
        return fn
59

    
60
    if not autocommit:
61
        return func
62
    def fn(self, *args, **kw):
63
        self.wrapper.execute()
64
        try:
65
            ret = func(self, *args, **kw)
66
            self.wrapper.commit()
67
            return ret
68
        except:
69
            self.wrapper.rollback()
70
            raise
71
    return fn
72

    
73

    
74
class ModularBackend(BaseBackend):
75
    """A modular backend.
76
    
77
    Uses modules for SQL functions and storage.
78
    """
79
    
80
    def __init__(self, db_module, db_connection, block_module, block_path):
81
        self.hash_algorithm = 'sha256'
82
        self.block_size = 4 * 1024 * 1024 # 4MB
83
        
84
        self.default_policy = {'quota': 0, 'versioning': 'auto'}
85
        
86
        __import__(db_module)
87
        self.db_module = sys.modules[db_module]
88
        self.wrapper = self.db_module.DBWrapper(db_connection)
89
        
90
        params = {'wrapper': self.wrapper}
91
        self.permissions = self.db_module.Permissions(**params)
92
        for x in ['READ', 'WRITE']:
93
            setattr(self, x, getattr(self.db_module, x))
94
        self.node = self.db_module.Node(**params)
95
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
96
            setattr(self, x, getattr(self.db_module, x))
97
        
98
        __import__(block_module)
99
        self.block_module = sys.modules[block_module]
100
        
101
        params = {'path': block_path,
102
                  'block_size': self.block_size,
103
                  'hash_algorithm': self.hash_algorithm}
104
        self.store = self.block_module.Store(**params)
105
    
106
    def close(self):
107
        self.wrapper.close()
108
    
109
    @backend_method
110
    def list_accounts(self, user, marker=None, limit=10000):
111
        """Return a list of accounts the user can access."""
112
        
113
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
114
        allowed = self._allowed_accounts(user)
115
        start, limit = self._list_limits(allowed, marker, limit)
116
        return allowed[start:start + limit]
117
    
118
    @backend_method
119
    def get_account_meta(self, user, account, until=None):
120
        """Return a dictionary with the account metadata."""
121
        
122
        logger.debug("get_account_meta: %s %s", account, until)
123
        path, node = self._lookup_account(account, user == account)
124
        if user != account:
125
            if until or node is None or account not in self._allowed_accounts(user):
126
                raise NotAllowedError
127
        try:
128
            props = self._get_properties(node, until)
129
            mtime = props[self.MTIME]
130
        except NameError:
131
            props = None
132
            mtime = until
133
        count, bytes, tstamp = self._get_statistics(node, until)
134
        tstamp = max(tstamp, mtime)
135
        if until is None:
136
            modified = tstamp
137
        else:
138
            modified = self._get_statistics(node)[2] # Overall last modification.
139
            modified = max(modified, mtime)
140
        
141
        if user != account:
142
            meta = {'name': account}
143
        else:
144
            meta = {}
145
            if props is not None:
146
                meta.update(dict(self.node.attribute_get(props[self.SERIAL])))
147
            if until is not None:
148
                meta.update({'until_timestamp': tstamp})
149
            meta.update({'name': account, 'count': count, 'bytes': bytes})
150
        meta.update({'modified': modified})
151
        return meta
152
    
153
    @backend_method
154
    def update_account_meta(self, user, account, meta, replace=False):
155
        """Update the metadata associated with the account."""
156
        
157
        logger.debug("update_account_meta: %s %s %s", account, meta, replace)
158
        if user != account:
159
            raise NotAllowedError
160
        path, node = self._lookup_account(account, True)
161
        self._put_metadata(user, node, meta, replace)
162
    
163
    @backend_method
164
    def get_account_groups(self, user, account):
165
        """Return a dictionary with the user groups defined for this account."""
166
        
167
        logger.debug("get_account_groups: %s", account)
168
        if user != account:
169
            if account not in self._allowed_accounts(user):
170
                raise NotAllowedError
171
            return {}
172
        self._lookup_account(account, True)
173
        return self.permissions.group_dict(account)
174
    
175
    @backend_method
176
    def update_account_groups(self, user, account, groups, replace=False):
177
        """Update the groups associated with the account."""
178
        
179
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
180
        if user != account:
181
            raise NotAllowedError
182
        self._lookup_account(account, True)
183
        self._check_groups(groups)
184
        if replace:
185
            self.permissions.group_destroy(account)
186
        for k, v in groups.iteritems():
187
            if not replace: # If not already deleted.
188
                self.permissions.group_delete(account, k)
189
            if v:
190
                self.permissions.group_addmany(account, k, v)
191
    
192
    @backend_method
193
    def get_account_policy(self, user, account):
194
        """Return a dictionary with the account policy."""
195
        
196
        logger.debug("get_account_policy: %s", account)
197
        if user != account:
198
            if account not in self._allowed_accounts(user):
199
                raise NotAllowedError
200
            return {}
201
        path, node = self._lookup_account(account, True)
202
        return self._get_policy(node)
203
    
204
    @backend_method
205
    def update_account_policy(self, user, account, policy, replace=False):
206
        """Update the policy associated with the account."""
207
        
208
        logger.debug("update_account_policy: %s %s %s", account, policy, replace)
209
        if user != account:
210
            raise NotAllowedError
211
        path, node = self._lookup_account(account, True)
212
        self._check_policy(policy)
213
        self._put_policy(node, policy, replace)
214
    
215
    @backend_method
216
    def put_account(self, user, account, policy={}):
217
        """Create a new account with the given name."""
218
        
219
        logger.debug("put_account: %s %s", account, policy)
220
        if user != account:
221
            raise NotAllowedError
222
        node = self.node.node_lookup(account)
223
        if node is not None:
224
            raise NameError('Account already exists')
225
        if policy:
226
            self._check_policy(policy)
227
        node = self._put_path(user, self.ROOTNODE, account)
228
        self._put_policy(node, policy, True)
229
    
230
    @backend_method
231
    def delete_account(self, user, account):
232
        """Delete the account with the given name."""
233
        
234
        logger.debug("delete_account: %s", account)
235
        if user != account:
236
            raise NotAllowedError
237
        node = self.node.node_lookup(account)
238
        if node is None:
239
            return
240
        if not self.node.node_remove(node):
241
            raise IndexError('Account is not empty')
242
        self.permissions.group_destroy(account)
243
    
244
    @backend_method
245
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
246
        """Return a list of containers existing under an account."""
247
        
248
        logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
249
        if user != account:
250
            if until or account not in self._allowed_accounts(user):
251
                raise NotAllowedError
252
            allowed = self._allowed_containers(user, account)
253
            start, limit = self._list_limits(allowed, marker, limit)
254
            return allowed[start:start + limit]
255
        if shared:
256
            allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
257
            allowed = list(set(allowed))
258
            start, limit = self._list_limits(allowed, marker, limit)
259
            return allowed[start:start + limit]
260
        node = self.node.node_lookup(account)
261
        return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
262
    
263
    @backend_method
264
    def get_container_meta(self, user, account, container, until=None):
265
        """Return a dictionary with the container metadata."""
266
        
267
        logger.debug("get_container_meta: %s %s %s", account, container, until)
268
        if user != account:
269
            if until or container not in self._allowed_containers(user, account):
270
                raise NotAllowedError
271
        path, node = self._lookup_container(account, container)
272
        props = self._get_properties(node, until)
273
        mtime = props[self.MTIME]
274
        count, bytes, tstamp = self._get_statistics(node, until)
275
        tstamp = max(tstamp, mtime)
276
        if until is None:
277
            modified = tstamp
278
        else:
279
            modified = self._get_statistics(node)[2] # Overall last modification.
280
            modified = max(modified, mtime)
281
        
282
        if user != account:
283
            meta = {'name': container}
284
        else:
285
            meta = dict(self.node.attribute_get(props[self.SERIAL]))
286
            if until is not None:
287
                meta.update({'until_timestamp': tstamp})
288
            meta.update({'name': container, 'count': count, 'bytes': bytes})
289
        meta.update({'modified': modified})
290
        return meta
291
    
292
    @backend_method
293
    def update_container_meta(self, user, account, container, meta, replace=False):
294
        """Update the metadata associated with the container."""
295
        
296
        logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
297
        if user != account:
298
            raise NotAllowedError
299
        path, node = self._lookup_container(account, container)
300
        self._put_metadata(user, node, meta, replace)
301
    
302
    @backend_method
303
    def get_container_policy(self, user, account, container):
304
        """Return a dictionary with the container policy."""
305
        
306
        logger.debug("get_container_policy: %s %s", account, container)
307
        if user != account:
308
            if container not in self._allowed_containers(user, account):
309
                raise NotAllowedError
310
            return {}
311
        path, node = self._lookup_container(account, container)
312
        return self._get_policy(node)
313
    
314
    @backend_method
315
    def update_container_policy(self, user, account, container, policy, replace=False):
316
        """Update the policy associated with the container."""
317
        
318
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
319
        if user != account:
320
            raise NotAllowedError
321
        path, node = self._lookup_container(account, container)
322
        self._check_policy(policy)
323
        self._put_policy(node, policy, replace)
324
    
325
    @backend_method
326
    def put_container(self, user, account, container, policy={}):
327
        """Create a new container with the given name."""
328
        
329
        logger.debug("put_container: %s %s %s", account, container, policy)
330
        if user != account:
331
            raise NotAllowedError
332
        try:
333
            path, node = self._lookup_container(account, container)
334
        except NameError:
335
            pass
336
        else:
337
            raise NameError('Container already exists')
338
        if policy:
339
            self._check_policy(policy)
340
        path = '/'.join((account, container))
341
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
342
        self._put_policy(node, policy, True)
343
    
344
    @backend_method
345
    def delete_container(self, user, account, container, until=None):
346
        """Delete/purge the container with the given name."""
347
        
348
        logger.debug("delete_container: %s %s %s", account, container, until)
349
        if user != account:
350
            raise NotAllowedError
351
        path, node = self._lookup_container(account, container)
352
        
353
        if until is not None:
354
            hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
355
            for h in hashes:
356
                self.store.map_delete(h)
357
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
358
            return
359
        
360
        if self._get_statistics(node)[0] > 0:
361
            raise IndexError('Container is not empty')
362
        hashes = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
363
        for h in hashes:
364
            self.store.map_delete(h)
365
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
366
        self.node.node_remove(node)
367
    
368
    @backend_method
369
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
370
        """Return a list of objects existing under a container."""
371
        
372
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
373
        allowed = []
374
        if user != account:
375
            if until:
376
                raise NotAllowedError
377
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
378
            if not allowed:
379
                raise NotAllowedError
380
        else:
381
            if shared:
382
                allowed = self.permissions.access_list_shared('/'.join((account, container)))
383
                if not allowed:
384
                    return []
385
        path, node = self._lookup_container(account, container)
386
        return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
387
    
388
    @backend_method
389
    def list_object_meta(self, user, account, container, until=None):
390
        """Return a list with all the container's object meta keys."""
391
        
392
        logger.debug("list_object_meta: %s %s %s", account, container, until)
393
        allowed = []
394
        if user != account:
395
            if until:
396
                raise NotAllowedError
397
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
398
            if not allowed:
399
                raise NotAllowedError
400
        path, node = self._lookup_container(account, container)
401
        before = until if until is not None else inf
402
        return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
403
    
404
    @backend_method
405
    def get_object_meta(self, user, account, container, name, version=None):
406
        """Return a dictionary with the object metadata."""
407
        
408
        logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
409
        self._can_read(user, account, container, name)
410
        path, node = self._lookup_object(account, container, name)
411
        props = self._get_version(node, version)
412
        if version is None:
413
            modified = props[self.MTIME]
414
        else:
415
            try:
416
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
417
            except NameError: # Object may be deleted.
418
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
419
                if del_props is None:
420
                    raise NameError('Object does not exist')
421
                modified = del_props[self.MTIME]
422
        
423
        meta = dict(self.node.attribute_get(props[self.SERIAL]))
424
        meta.update({'name': name, 'bytes': props[self.SIZE], 'hash':props[self.HASH]})
425
        meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
426
        meta.update({'modified': modified, 'modified_by': props[self.MUSER]})
427
        return meta
428
    
429
    @backend_method
430
    def update_object_meta(self, user, account, container, name, meta, replace=False):
431
        """Update the metadata associated with the object."""
432
        
433
        logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
434
        self._can_write(user, account, container, name)
435
        path, node = self._lookup_object(account, container, name)
436
        src_version_id, dest_version_id = self._put_metadata(user, node, meta, replace)
437
        self._apply_versioning(account, container, src_version_id)
438
        return dest_version_id
439
    
440
    @backend_method
441
    def get_object_permissions(self, user, account, container, name):
442
        """Return the action allowed on the object, the path
443
        from which the object gets its permissions from,
444
        along with a dictionary containing the permissions."""
445
        
446
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
447
        allowed = 'write'
448
        if user != account:
449
            path = '/'.join((account, container, name))
450
            if self.permissions.access_check(path, self.WRITE, user):
451
                allowed = 'write'
452
            elif self.permissions.access_check(path, self.READ, user):
453
                allowed = 'read'
454
            else:
455
                raise NotAllowedError
456
        path = self._lookup_object(account, container, name)[0]
457
        return (allowed,) + self.permissions.access_inherit(path)
458
    
459
    @backend_method
460
    def update_object_permissions(self, user, account, container, name, permissions):
461
        """Update the permissions associated with the object."""
462
        
463
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
464
        if user != account:
465
            raise NotAllowedError
466
        path = self._lookup_object(account, container, name)[0]
467
        self._check_permissions(path, permissions)
468
        self.permissions.access_set(path, permissions)
469
    
470
    @backend_method
471
    def get_object_public(self, user, account, container, name):
472
        """Return the public id of the object if applicable."""
473
        
474
        logger.debug("get_object_public: %s %s %s", account, container, name)
475
        self._can_read(user, account, container, name)
476
        path = self._lookup_object(account, container, name)[0]
477
        p = self.permissions.public_get(path)
478
        if p is not None:
479
            p += ULTIMATE_ANSWER
480
        return p
481
    
482
    @backend_method
483
    def update_object_public(self, user, account, container, name, public):
484
        """Update the public status of the object."""
485
        
486
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
487
        self._can_write(user, account, container, name)
488
        path = self._lookup_object(account, container, name)[0]
489
        if not public:
490
            self.permissions.public_unset(path)
491
        else:
492
            self.permissions.public_set(path)
493
    
494
    @backend_method
495
    def get_object_hashmap(self, user, account, container, name, version=None):
496
        """Return the object's size and a list with partial hashes."""
497
        
498
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
499
        self._can_read(user, account, container, name)
500
        path, node = self._lookup_object(account, container, name)
501
        props = self._get_version(node, version)
502
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
503
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
504
    
505
    def _update_object_hash(self, user, account, container, name, size, hash, meta={}, replace_meta=False, permissions=None):
506
        if permissions is not None and user != account:
507
            raise NotAllowedError
508
        self._can_write(user, account, container, name)
509
        if permissions is not None:
510
            path = '/'.join((account, container, name))
511
            self._check_permissions(path, permissions)
512
        
513
        account_path, account_node = self._lookup_account(account, True)
514
        container_path, container_node = self._lookup_container(account, container)
515
        path, node = self._put_object_node(container_path, container_node, name)
516
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size, hash)
517
        
518
        # Check quota.
519
        size_delta = size # Change with versioning.
520
        if size_delta > 0:
521
            account_quota = long(self._get_policy(account_node)['quota'])
522
            container_quota = long(self._get_policy(container_node)['quota'])
523
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
524
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
525
                # This must be executed in a transaction, so the version is never created if it fails.
526
                raise QuotaError
527
        
528
        if not replace_meta and src_version_id is not None:
529
            self.node.attribute_copy(src_version_id, dest_version_id)
530
        self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
531
        if permissions is not None:
532
            self.permissions.access_set(path, permissions)
533
        self._apply_versioning(account, container, src_version_id)
534
        return dest_version_id
535
    
536
    @backend_method
537
    def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
538
        """Create/update an object with the specified size and partial hashes."""
539
        
540
        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
541
        if size == 0: # No such thing as an empty hashmap.
542
            hashmap = [self.put_block('')]
543
        map = HashMap(self.block_size, self.hash_algorithm)
544
        map.extend([binascii.unhexlify(x) for x in hashmap])
545
        missing = self.store.block_search(map)
546
        if missing:
547
            ie = IndexError()
548
            ie.data = [binascii.hexlify(x) for x in missing]
549
            raise ie
550
        
551
        hash = map.hash()
552
        dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), meta, replace_meta, permissions)
553
        self.store.map_put(hash, map)
554
        return dest_version_id
555
    
556
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
557
        self._can_read(user, src_account, src_container, src_name)
558
        path, node = self._lookup_object(src_account, src_container, src_name)
559
        props = self._get_version(node, src_version)
560
        src_version_id = props[self.SERIAL]
561
        hash = props[self.HASH]
562
        size = props[self.SIZE]
563
        
564
        if (src_account, src_container, src_name) == (dest_account, dest_container, dest_name):
565
            dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, dest_meta, replace_meta, permissions)
566
        else:
567
            if replace_meta:
568
                meta = dest_meta
569
            else:
570
                meta = {}
571
            dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, meta, True, permissions)
572
            if not replace_meta:
573
                self.node.attribute_copy(src_version_id, dest_version_id)
574
                self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
575
        return dest_version_id
576
    
577
    @backend_method
578
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
579
        """Copy an object's data and metadata."""
580
        
581
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
582
        return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
583
    
584
    @backend_method
585
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
586
        """Move an object's data and metadata."""
587
        
588
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions)
589
        if user != src_account:
590
            raise NotAllowedError
591
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
592
        if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
593
            self._delete_object(user, src_account, src_container, src_name)
594
        return dest_version_id
595
    
596
    def _delete_object(self, user, account, container, name, until=None):
597
        if user != account:
598
            raise NotAllowedError
599
        
600
        if until is not None:
601
            path = '/'.join((account, container, name))
602
            node = self.node.node_lookup(path)
603
            if node is None:
604
                return
605
            hashes = self.node.node_purge(node, until, CLUSTER_NORMAL)
606
            hashes += self.node.node_purge(node, until, CLUSTER_HISTORY)
607
            for h in hashes:
608
                self.store.map_delete(h)
609
            self.node.node_purge(node, until, CLUSTER_DELETED)
610
            try:
611
                props = self._get_version(node)
612
            except NameError:
613
                self.permissions.access_clear(path)
614
            return
615
        
616
        path, node = self._lookup_object(account, container, name)
617
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, 0, None, CLUSTER_DELETED)
618
        self._apply_versioning(account, container, src_version_id)
619
        self.permissions.access_clear(path)
620
    
621
    @backend_method
622
    def delete_object(self, user, account, container, name, until=None):
623
        """Delete/purge an object."""
624
        
625
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
626
        self._delete_object(user, account, container, name, until)
627
    
628
    @backend_method
629
    def list_versions(self, user, account, container, name):
630
        """Return a list of all (version, version_timestamp) tuples for an object."""
631
        
632
        logger.debug("list_versions: %s %s %s", account, container, name)
633
        self._can_read(user, account, container, name)
634
        path, node = self._lookup_object(account, container, name)
635
        versions = self.node.node_get_versions(node)
636
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
637
    
638
    @backend_method
639
    def get_public(self, user, public):
640
        """Return the (account, container, name) for the public id given."""
641
        logger.debug("get_public: %s", public)
642
        if public is None or public < ULTIMATE_ANSWER:
643
            raise NameError
644
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
645
        account, container, name = path.split('/', 2)
646
        self._can_read(user, account, container, name)
647
        return (account, container, name)
648
    
649
    @backend_method(autocommit=0)
650
    def get_block(self, hash):
651
        """Return a block's data."""
652
        
653
        logger.debug("get_block: %s", hash)
654
        block = self.store.block_get(binascii.unhexlify(hash))
655
        if not block:
656
            raise NameError('Block does not exist')
657
        return block
658
    
659
    @backend_method(autocommit=0)
660
    def put_block(self, data):
661
        """Store a block and return the hash."""
662
        
663
        logger.debug("put_block: %s", len(data))
664
        return binascii.hexlify(self.store.block_put(data))
665
    
666
    @backend_method(autocommit=0)
667
    def update_block(self, hash, data, offset=0):
668
        """Update a known block and return the hash."""
669
        
670
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
671
        if offset == 0 and len(data) == self.block_size:
672
            return self.put_block(data)
673
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
674
        return binascii.hexlify(h)
675
    
676
    # Path functions.
677
    
678
    def _put_object_node(self, path, parent, name):
679
        path = '/'.join((path, name))
680
        node = self.node.node_lookup(path)
681
        if node is None:
682
            node = self.node.node_create(parent, path)
683
        return path, node
684
    
685
    def _put_path(self, user, parent, path):
686
        node = self.node.node_create(parent, path)
687
        self.node.version_create(node, None, 0, None, user, CLUSTER_NORMAL)
688
        return node
689
    
690
    def _lookup_account(self, account, create=True):
691
        node = self.node.node_lookup(account)
692
        if node is None and create:
693
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
694
        return account, node
695
    
696
    def _lookup_container(self, account, container):
697
        path = '/'.join((account, container))
698
        node = self.node.node_lookup(path)
699
        if node is None:
700
            raise NameError('Container does not exist')
701
        return path, node
702
    
703
    def _lookup_object(self, account, container, name):
704
        path = '/'.join((account, container, name))
705
        node = self.node.node_lookup(path)
706
        if node is None:
707
            raise NameError('Object does not exist')
708
        return path, node
709
    
710
    def _get_properties(self, node, until=None):
711
        """Return properties until the timestamp given."""
712
        
713
        before = until if until is not None else inf
714
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
715
        if props is None and until is not None:
716
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
717
        if props is None:
718
            raise NameError('Path does not exist')
719
        return props
720
    
721
    def _get_statistics(self, node, until=None):
722
        """Return count, sum of size and latest timestamp of everything under node."""
723
        
724
        if until is None:
725
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
726
        else:
727
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
728
        if stats is None:
729
            stats = (0, 0, 0)
730
        return stats
731
    
732
    def _get_version(self, node, version=None):
733
        if version is None:
734
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
735
            if props is None:
736
                raise NameError('Object does not exist')
737
        else:
738
            try:
739
                version = int(version)
740
            except ValueError:
741
                raise IndexError('Version does not exist')
742
            props = self.node.version_get_properties(version)
743
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
744
                raise IndexError('Version does not exist')
745
        return props
746
    
747
    def _put_version_duplicate(self, user, node, size=None, hash=None, cluster=CLUSTER_NORMAL):
748
        """Create a new version of the node."""
749
        
750
        props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
751
        if props is not None:
752
            src_version_id = props[self.SERIAL]
753
            src_hash = props[self.HASH]
754
            src_size = props[self.SIZE]
755
        else:
756
            src_version_id = None
757
            src_hash = None
758
            src_size = 0
759
        if size is None:
760
            hash = src_hash # This way hash can be set to None.
761
            size = src_size
762
        
763
        if src_version_id is not None:
764
            self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
765
        dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, cluster)
766
        return src_version_id, dest_version_id
767
    
768
    def _put_metadata(self, user, node, meta, replace=False):
769
        """Create a new version and store metadata."""
770
        
771
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
772
        
773
        # TODO: Merge with other functions that update metadata...
774
        if not replace:
775
            if src_version_id is not None:
776
                self.node.attribute_copy(src_version_id, dest_version_id)
777
            self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
778
            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
779
        else:
780
            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
781
        return src_version_id, dest_version_id
782
    
783
    def _list_limits(self, listing, marker, limit):
784
        start = 0
785
        if marker:
786
            try:
787
                start = listing.index(marker) + 1
788
            except ValueError:
789
                pass
790
        if not limit or limit > 10000:
791
            limit = 10000
792
        return start, limit
793
    
794
    def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
795
        cont_prefix = path + '/'
796
        prefix = cont_prefix + prefix
797
        start = cont_prefix + marker if marker else None
798
        before = until if until is not None else inf
799
        filterq = ','.join(keys) if keys else None
800
        
801
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, filterq)
802
        objects.extend([(p, None) for p in prefixes] if virtual else [])
803
        objects.sort(key=lambda x: x[0])
804
        objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
805
        
806
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
807
        return objects[start:start + limit]
808
    
809
    # Policy functions.
810
    
811
    def _check_policy(self, policy):
812
        for k in policy.keys():
813
            if policy[k] == '':
814
                policy[k] = self.default_policy.get(k)
815
        for k, v in policy.iteritems():
816
            if k == 'quota':
817
                q = int(v) # May raise ValueError.
818
                if q < 0:
819
                    raise ValueError
820
            elif k == 'versioning':
821
                if v not in ['auto', 'none']:
822
                    raise ValueError
823
            else:
824
                raise ValueError
825
    
826
    def _put_policy(self, node, policy, replace):
827
        if replace:
828
            for k, v in self.default_policy.iteritems():
829
                if k not in policy:
830
                    policy[k] = v
831
        self.node.policy_set(node, policy)
832
    
833
    def _get_policy(self, node):
834
        policy = self.default_policy.copy()
835
        policy.update(self.node.policy_get(node))
836
        return policy
837
    
838
    def _apply_versioning(self, account, container, version_id):
839
        if version_id is None:
840
            return
841
        path, node = self._lookup_container(account, container)
842
        versioning = self._get_policy(node)['versioning']
843
        if versioning != 'auto':
844
            hash = self.node.version_remove(version_id)
845
            self.store.map_delete(hash)
846
    
847
    # Access control functions.
848
    
849
    def _check_groups(self, groups):
850
        # raise ValueError('Bad characters in groups')
851
        pass
852
    
853
    def _check_permissions(self, path, permissions):
854
        # raise ValueError('Bad characters in permissions')
855
        
856
        # Check for existing permissions.
857
        paths = self.permissions.access_list(path)
858
        if paths:
859
            ae = AttributeError()
860
            ae.data = paths
861
            raise ae
862
    
863
    def _can_read(self, user, account, container, name):
864
        if user == account:
865
            return True
866
        path = '/'.join((account, container, name))
867
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
868
            raise NotAllowedError
869
    
870
    def _can_write(self, user, account, container, name):
871
        if user == account:
872
            return True
873
        path = '/'.join((account, container, name))
874
        if not self.permissions.access_check(path, self.WRITE, user):
875
            raise NotAllowedError
876
    
877
    def _allowed_accounts(self, user):
878
        allow = set()
879
        for path in self.permissions.access_list_paths(user):
880
            allow.add(path.split('/', 1)[0])
881
        return sorted(allow)
882
    
883
    def _allowed_containers(self, user, account):
884
        allow = set()
885
        for path in self.permissions.access_list_paths(user, account):
886
            allow.add(path.split('/', 2)[1])
887
        return sorted(allow)