Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / modular.py @ cb446fb8

History | View | Annotate | Download (37.9 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import logging
38
import hashlib
39
import binascii
40

    
41
from base import NotAllowedError, QuotaError, BaseBackend
42

    
43
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
44

    
45
inf = float('inf')
46

    
47
ULTIMATE_ANSWER = 42
48

    
49

    
50
logger = logging.getLogger(__name__)
51

    
52

    
53
class HashMap(list):
54
    
55
    def __init__(self, blocksize, blockhash):
56
        super(HashMap, self).__init__()
57
        self.blocksize = blocksize
58
        self.blockhash = blockhash
59
    
60
    def _hash_raw(self, v):
61
        h = hashlib.new(self.blockhash)
62
        h.update(v)
63
        return h.digest()
64
    
65
    def hash(self):
66
        if len(self) == 0:
67
            return self._hash_raw('')
68
        if len(self) == 1:
69
            return self.__getitem__(0)
70
        
71
        h = list(self)
72
        s = 2
73
        while s < len(h):
74
            s = s * 2
75
        h += [('\x00' * len(h[0]))] * (s - len(h))
76
        while len(h) > 1:
77
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
78
        return h[0]
79

    
80

    
81
def backend_method(func=None, autocommit=1):
82
    if func is None:
83
        def fn(func):
84
            return backend_method(func, autocommit)
85
        return fn
86

    
87
    if not autocommit:
88
        return func
89
    def fn(self, *args, **kw):
90
        self.wrapper.execute()
91
        try:
92
            ret = func(self, *args, **kw)
93
            self.wrapper.commit()
94
            return ret
95
        except:
96
            self.wrapper.rollback()
97
            raise
98
    return fn
99

    
100

    
101
class ModularBackend(BaseBackend):
102
    """A modular backend.
103
    
104
    Uses modules for SQL functions and storage.
105
    """
106
    
107
    def __init__(self, db_module, db_connection, block_module, block_path):
108
        self.hash_algorithm = 'sha256'
109
        self.block_size = 4 * 1024 * 1024 # 4MB
110
        
111
        self.default_policy = {'quota': 0, 'versioning': 'auto'}
112
        
113
        __import__(db_module)
114
        self.db_module = sys.modules[db_module]
115
        self.wrapper = self.db_module.DBWrapper(db_connection)
116
        
117
        params = {'wrapper': self.wrapper}
118
        self.permissions = self.db_module.Permissions(**params)
119
        for x in ['READ', 'WRITE']:
120
            setattr(self, x, getattr(self.db_module, x))
121
        self.node = self.db_module.Node(**params)
122
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
123
            setattr(self, x, getattr(self.db_module, x))
124
        
125
        __import__(block_module)
126
        self.block_module = sys.modules[block_module]
127
        
128
        params = {'path': block_path,
129
                  'block_size': self.block_size,
130
                  'hash_algorithm': self.hash_algorithm}
131
        self.store = self.block_module.Store(**params)
132
    
133
    def close(self):
134
        self.wrapper.close()
135
    
136
    @backend_method
137
    def list_accounts(self, user, marker=None, limit=10000):
138
        """Return a list of accounts the user can access."""
139
        
140
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
141
        allowed = self._allowed_accounts(user)
142
        start, limit = self._list_limits(allowed, marker, limit)
143
        return allowed[start:start + limit]
144
    
145
    @backend_method
146
    def get_account_meta(self, user, account, until=None):
147
        """Return a dictionary with the account metadata."""
148
        
149
        logger.debug("get_account_meta: %s %s", account, until)
150
        path, node = self._lookup_account(account, user == account)
151
        if user != account:
152
            if until or node is None or account not in self._allowed_accounts(user):
153
                raise NotAllowedError
154
        try:
155
            props = self._get_properties(node, until)
156
            mtime = props[self.MTIME]
157
        except NameError:
158
            props = None
159
            mtime = until
160
        count, bytes, tstamp = self._get_statistics(node, until)
161
        tstamp = max(tstamp, mtime)
162
        if until is None:
163
            modified = tstamp
164
        else:
165
            modified = self._get_statistics(node)[2] # Overall last modification.
166
            modified = max(modified, mtime)
167
        
168
        if user != account:
169
            meta = {'name': account}
170
        else:
171
            meta = {}
172
            if props is not None:
173
                meta.update(dict(self.node.attribute_get(props[self.SERIAL])))
174
            if until is not None:
175
                meta.update({'until_timestamp': tstamp})
176
            meta.update({'name': account, 'count': count, 'bytes': bytes})
177
        meta.update({'modified': modified})
178
        return meta
179
    
180
    @backend_method
181
    def update_account_meta(self, user, account, meta, replace=False):
182
        """Update the metadata associated with the account."""
183
        
184
        logger.debug("update_account_meta: %s %s %s", account, meta, replace)
185
        if user != account:
186
            raise NotAllowedError
187
        path, node = self._lookup_account(account, True)
188
        self._put_metadata(user, node, meta, replace)
189
    
190
    @backend_method
191
    def get_account_groups(self, user, account):
192
        """Return a dictionary with the user groups defined for this account."""
193
        
194
        logger.debug("get_account_groups: %s", account)
195
        if user != account:
196
            if account not in self._allowed_accounts(user):
197
                raise NotAllowedError
198
            return {}
199
        self._lookup_account(account, True)
200
        return self.permissions.group_dict(account)
201
    
202
    @backend_method
203
    def update_account_groups(self, user, account, groups, replace=False):
204
        """Update the groups associated with the account."""
205
        
206
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
207
        if user != account:
208
            raise NotAllowedError
209
        self._lookup_account(account, True)
210
        self._check_groups(groups)
211
        if replace:
212
            self.permissions.group_destroy(account)
213
        for k, v in groups.iteritems():
214
            if not replace: # If not already deleted.
215
                self.permissions.group_delete(account, k)
216
            if v:
217
                self.permissions.group_addmany(account, k, v)
218
    
219
    @backend_method
220
    def get_account_policy(self, user, account):
221
        """Return a dictionary with the account policy."""
222
        
223
        logger.debug("get_account_policy: %s", account)
224
        if user != account:
225
            if account not in self._allowed_accounts(user):
226
                raise NotAllowedError
227
            return {}
228
        path, node = self._lookup_account(account, True)
229
        return self._get_policy(node)
230
    
231
    @backend_method
232
    def update_account_policy(self, user, account, policy, replace=False):
233
        """Update the policy associated with the account."""
234
        
235
        logger.debug("update_account_policy: %s %s %s", account, policy, replace)
236
        if user != account:
237
            raise NotAllowedError
238
        path, node = self._lookup_account(account, True)
239
        self._check_policy(policy)
240
        self._put_policy(node, policy, replace)
241
    
242
    @backend_method
243
    def put_account(self, user, account, policy={}):
244
        """Create a new account with the given name."""
245
        
246
        logger.debug("put_account: %s %s", account, policy)
247
        if user != account:
248
            raise NotAllowedError
249
        node = self.node.node_lookup(account)
250
        if node is not None:
251
            raise NameError('Account already exists')
252
        if policy:
253
            self._check_policy(policy)
254
        node = self._put_path(user, self.ROOTNODE, account)
255
        self._put_policy(node, policy, True)
256
    
257
    @backend_method
258
    def delete_account(self, user, account):
259
        """Delete the account with the given name."""
260
        
261
        logger.debug("delete_account: %s", account)
262
        if user != account:
263
            raise NotAllowedError
264
        node = self.node.node_lookup(account)
265
        if node is None:
266
            return
267
        if not self.node.node_remove(node):
268
            raise IndexError('Account is not empty')
269
        self.permissions.group_destroy(account)
270
    
271
    @backend_method
272
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
273
        """Return a list of containers existing under an account."""
274
        
275
        logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
276
        if user != account:
277
            if until or account not in self._allowed_accounts(user):
278
                raise NotAllowedError
279
            allowed = self._allowed_containers(user, account)
280
            start, limit = self._list_limits(allowed, marker, limit)
281
            return allowed[start:start + limit]
282
        if shared:
283
            allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
284
            allowed = list(set(allowed))
285
            start, limit = self._list_limits(allowed, marker, limit)
286
            return allowed[start:start + limit]
287
        node = self.node.node_lookup(account)
288
        return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
289
    
290
    @backend_method
291
    def get_container_meta(self, user, account, container, until=None):
292
        """Return a dictionary with the container metadata."""
293
        
294
        logger.debug("get_container_meta: %s %s %s", account, container, until)
295
        if user != account:
296
            if until or container not in self._allowed_containers(user, account):
297
                raise NotAllowedError
298
        path, node = self._lookup_container(account, container)
299
        props = self._get_properties(node, until)
300
        mtime = props[self.MTIME]
301
        count, bytes, tstamp = self._get_statistics(node, until)
302
        tstamp = max(tstamp, mtime)
303
        if until is None:
304
            modified = tstamp
305
        else:
306
            modified = self._get_statistics(node)[2] # Overall last modification.
307
            modified = max(modified, mtime)
308
        
309
        if user != account:
310
            meta = {'name': container}
311
        else:
312
            meta = dict(self.node.attribute_get(props[self.SERIAL]))
313
            if until is not None:
314
                meta.update({'until_timestamp': tstamp})
315
            meta.update({'name': container, 'count': count, 'bytes': bytes})
316
        meta.update({'modified': modified})
317
        return meta
318
    
319
    @backend_method
320
    def update_container_meta(self, user, account, container, meta, replace=False):
321
        """Update the metadata associated with the container."""
322
        
323
        logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
324
        if user != account:
325
            raise NotAllowedError
326
        path, node = self._lookup_container(account, container)
327
        self._put_metadata(user, node, meta, replace)
328
    
329
    @backend_method
330
    def get_container_policy(self, user, account, container):
331
        """Return a dictionary with the container policy."""
332
        
333
        logger.debug("get_container_policy: %s %s", account, container)
334
        if user != account:
335
            if container not in self._allowed_containers(user, account):
336
                raise NotAllowedError
337
            return {}
338
        path, node = self._lookup_container(account, container)
339
        return self._get_policy(node)
340
    
341
    @backend_method
342
    def update_container_policy(self, user, account, container, policy, replace=False):
343
        """Update the policy associated with the container."""
344
        
345
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
346
        if user != account:
347
            raise NotAllowedError
348
        path, node = self._lookup_container(account, container)
349
        self._check_policy(policy)
350
        self._put_policy(node, policy, replace)
351
    
352
    @backend_method
353
    def put_container(self, user, account, container, policy={}):
354
        """Create a new container with the given name."""
355
        
356
        logger.debug("put_container: %s %s %s", account, container, policy)
357
        if user != account:
358
            raise NotAllowedError
359
        try:
360
            path, node = self._lookup_container(account, container)
361
        except NameError:
362
            pass
363
        else:
364
            raise NameError('Container already exists')
365
        if policy:
366
            self._check_policy(policy)
367
        path = '/'.join((account, container))
368
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
369
        self._put_policy(node, policy, True)
370
    
371
    @backend_method
372
    def delete_container(self, user, account, container, until=None):
373
        """Delete/purge the container with the given name."""
374
        
375
        logger.debug("delete_container: %s %s %s", account, container, until)
376
        if user != account:
377
            raise NotAllowedError
378
        path, node = self._lookup_container(account, container)
379
        
380
        if until is not None:
381
            hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
382
            for h in hashes:
383
                self.store.map_delete(h)
384
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
385
            return
386
        
387
        if self._get_statistics(node)[0] > 0:
388
            raise IndexError('Container is not empty')
389
        hashes = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
390
        for h in hashes:
391
            self.store.map_delete(h)
392
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
393
        self.node.node_remove(node)
394
    
395
    @backend_method
396
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
397
        """Return a list of objects existing under a container."""
398
        
399
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
400
        allowed = []
401
        if user != account:
402
            if until:
403
                raise NotAllowedError
404
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
405
            if not allowed:
406
                raise NotAllowedError
407
        else:
408
            if shared:
409
                allowed = self.permissions.access_list_shared('/'.join((account, container)))
410
                if not allowed:
411
                    return []
412
        path, node = self._lookup_container(account, container)
413
        return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
414
    
415
    @backend_method
416
    def list_object_meta(self, user, account, container, until=None):
417
        """Return a list with all the container's object meta keys."""
418
        
419
        logger.debug("list_object_meta: %s %s %s", account, container, until)
420
        allowed = []
421
        if user != account:
422
            if until:
423
                raise NotAllowedError
424
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
425
            if not allowed:
426
                raise NotAllowedError
427
        path, node = self._lookup_container(account, container)
428
        before = until if until is not None else inf
429
        return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
430
    
431
    @backend_method
432
    def get_object_meta(self, user, account, container, name, version=None):
433
        """Return a dictionary with the object metadata."""
434
        
435
        logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
436
        self._can_read(user, account, container, name)
437
        path, node = self._lookup_object(account, container, name)
438
        props = self._get_version(node, version)
439
        if version is None:
440
            modified = props[self.MTIME]
441
        else:
442
            try:
443
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
444
            except NameError: # Object may be deleted.
445
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
446
                if del_props is None:
447
                    raise NameError('Object does not exist')
448
                modified = del_props[self.MTIME]
449
        
450
        meta = dict(self.node.attribute_get(props[self.SERIAL]))
451
        meta.update({'name': name, 'bytes': props[self.SIZE], 'hash':props[self.HASH]})
452
        meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
453
        meta.update({'modified': modified, 'modified_by': props[self.MUSER]})
454
        return meta
455
    
456
    @backend_method
457
    def update_object_meta(self, user, account, container, name, meta, replace=False):
458
        """Update the metadata associated with the object."""
459
        
460
        logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
461
        self._can_write(user, account, container, name)
462
        path, node = self._lookup_object(account, container, name)
463
        src_version_id, dest_version_id = self._put_metadata(user, node, meta, replace)
464
        self._apply_versioning(account, container, src_version_id)
465
        return dest_version_id
466
    
467
    @backend_method
468
    def get_object_permissions(self, user, account, container, name):
469
        """Return the action allowed on the object, the path
470
        from which the object gets its permissions from,
471
        along with a dictionary containing the permissions."""
472
        
473
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
474
        allowed = 'write'
475
        if user != account:
476
            path = '/'.join((account, container, name))
477
            if self.permissions.access_check(path, self.WRITE, user):
478
                allowed = 'write'
479
            elif self.permissions.access_check(path, self.READ, user):
480
                allowed = 'read'
481
            else:
482
                raise NotAllowedError
483
        path = self._lookup_object(account, container, name)[0]
484
        return (allowed,) + self.permissions.access_inherit(path)
485
    
486
    @backend_method
487
    def update_object_permissions(self, user, account, container, name, permissions):
488
        """Update the permissions associated with the object."""
489
        
490
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
491
        if user != account:
492
            raise NotAllowedError
493
        path = self._lookup_object(account, container, name)[0]
494
        self._check_permissions(path, permissions)
495
        self.permissions.access_set(path, permissions)
496
    
497
    @backend_method
498
    def get_object_public(self, user, account, container, name):
499
        """Return the public id of the object if applicable."""
500
        
501
        logger.debug("get_object_public: %s %s %s", account, container, name)
502
        self._can_read(user, account, container, name)
503
        path = self._lookup_object(account, container, name)[0]
504
        p = self.permissions.public_get(path)
505
        if p is not None:
506
            p += ULTIMATE_ANSWER
507
        return p
508
    
509
    @backend_method
510
    def update_object_public(self, user, account, container, name, public):
511
        """Update the public status of the object."""
512
        
513
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
514
        self._can_write(user, account, container, name)
515
        path = self._lookup_object(account, container, name)[0]
516
        if not public:
517
            self.permissions.public_unset(path)
518
        else:
519
            self.permissions.public_set(path)
520
    
521
    @backend_method
522
    def get_object_hashmap(self, user, account, container, name, version=None):
523
        """Return the object's size and a list with partial hashes."""
524
        
525
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
526
        self._can_read(user, account, container, name)
527
        path, node = self._lookup_object(account, container, name)
528
        props = self._get_version(node, version)
529
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
530
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
531
    
532
    def _update_object_hash(self, user, account, container, name, size, hash, meta={}, replace_meta=False, permissions=None):
533
        if permissions is not None and user != account:
534
            raise NotAllowedError
535
        self._can_write(user, account, container, name)
536
        if permissions is not None:
537
            path = '/'.join((account, container, name))
538
            self._check_permissions(path, permissions)
539
        
540
        account_path, account_node = self._lookup_account(account, True)
541
        container_path, container_node = self._lookup_container(account, container)
542
        path, node = self._put_object_node(container_path, container_node, name)
543
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size, hash)
544
        
545
        # Check quota.
546
        size_delta = size # Change with versioning.
547
        if size_delta > 0:
548
            account_quota = long(self._get_policy(account_node)['quota'])
549
            container_quota = long(self._get_policy(container_node)['quota'])
550
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
551
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
552
                # This must be executed in a transaction, so the version is never created if it fails.
553
                raise QuotaError
554
        
555
        if not replace_meta and src_version_id is not None:
556
            self.node.attribute_copy(src_version_id, dest_version_id)
557
        self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
558
        if permissions is not None:
559
            self.permissions.access_set(path, permissions)
560
        self._apply_versioning(account, container, src_version_id)
561
        return dest_version_id
562
    
563
    @backend_method
564
    def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
565
        """Create/update an object with the specified size and partial hashes."""
566
        
567
        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
568
        if size == 0: # No such thing as an empty hashmap.
569
            hashmap = [self.put_block('')]
570
        map = HashMap(self.block_size, self.hash_algorithm)
571
        map.extend([binascii.unhexlify(x) for x in hashmap])
572
        missing = self.store.block_search(map)
573
        if missing:
574
            ie = IndexError()
575
            ie.data = [binascii.hexlify(x) for x in missing]
576
            raise ie
577
        
578
        hash = map.hash()
579
        dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), meta, replace_meta, permissions)
580
        self.store.map_put(hash, map)
581
        return dest_version_id
582
    
583
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
584
        self._can_read(user, src_account, src_container, src_name)
585
        path, node = self._lookup_object(src_account, src_container, src_name)
586
        props = self._get_version(node, src_version)
587
        src_version_id = props[self.SERIAL]
588
        hash = props[self.HASH]
589
        size = props[self.SIZE]
590
        
591
        if (src_account, src_container, src_name) == (dest_account, dest_container, dest_name):
592
            dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, dest_meta, replace_meta, permissions)
593
        else:
594
            if replace_meta:
595
                meta = dest_meta
596
            else:
597
                meta = {}
598
            dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, meta, True, permissions)
599
            if not replace_meta:
600
                self.node.attribute_copy(src_version_id, dest_version_id)
601
                self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
602
        return dest_version_id
603
    
604
    @backend_method
605
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
606
        """Copy an object's data and metadata."""
607
        
608
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
609
        return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
610
    
611
    @backend_method
612
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
613
        """Move an object's data and metadata."""
614
        
615
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions)
616
        if user != src_account:
617
            raise NotAllowedError
618
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
619
        if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
620
            self._delete_object(user, src_account, src_container, src_name)
621
        return dest_version_id
622
    
623
    def _delete_object(self, user, account, container, name, until=None):
624
        if user != account:
625
            raise NotAllowedError
626
        
627
        if until is not None:
628
            path = '/'.join((account, container, name))
629
            node = self.node.node_lookup(path)
630
            if node is None:
631
                return
632
            hashes = self.node.node_purge(node, until, CLUSTER_NORMAL)
633
            hashes += self.node.node_purge(node, until, CLUSTER_HISTORY)
634
            for h in hashes:
635
                self.store.map_delete(h)
636
            self.node.node_purge(node, until, CLUSTER_DELETED)
637
            try:
638
                props = self._get_version(node)
639
            except NameError:
640
                self.permissions.access_clear(path)
641
            return
642
        
643
        path, node = self._lookup_object(account, container, name)
644
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, 0, None, CLUSTER_DELETED)
645
        self._apply_versioning(account, container, src_version_id)
646
        self.permissions.access_clear(path)
647
    
648
    @backend_method
649
    def delete_object(self, user, account, container, name, until=None):
650
        """Delete/purge an object."""
651
        
652
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
653
        self._delete_object(user, account, container, name, until)
654
    
655
    @backend_method
656
    def list_versions(self, user, account, container, name):
657
        """Return a list of all (version, version_timestamp) tuples for an object."""
658
        
659
        logger.debug("list_versions: %s %s %s", account, container, name)
660
        self._can_read(user, account, container, name)
661
        path, node = self._lookup_object(account, container, name)
662
        versions = self.node.node_get_versions(node)
663
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
664
    
665
    @backend_method
666
    def get_public(self, user, public):
667
        """Return the (account, container, name) for the public id given."""
668
        logger.debug("get_public: %s", public)
669
        if public is None or public < ULTIMATE_ANSWER:
670
            raise NameError
671
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
672
        account, container, name = path.split('/', 2)
673
        self._can_read(user, account, container, name)
674
        return (account, container, name)
675
    
676
    @backend_method(autocommit=0)
677
    def get_block(self, hash):
678
        """Return a block's data."""
679
        
680
        logger.debug("get_block: %s", hash)
681
        block = self.store.block_get(binascii.unhexlify(hash))
682
        if not block:
683
            raise NameError('Block does not exist')
684
        return block
685
    
686
    @backend_method(autocommit=0)
687
    def put_block(self, data):
688
        """Store a block and return the hash."""
689
        
690
        logger.debug("put_block: %s", len(data))
691
        return binascii.hexlify(self.store.block_put(data))
692
    
693
    @backend_method(autocommit=0)
694
    def update_block(self, hash, data, offset=0):
695
        """Update a known block and return the hash."""
696
        
697
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
698
        if offset == 0 and len(data) == self.block_size:
699
            return self.put_block(data)
700
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
701
        return binascii.hexlify(h)
702
    
703
    # Path functions.
704
    
705
    def _put_object_node(self, path, parent, name):
706
        path = '/'.join((path, name))
707
        node = self.node.node_lookup(path)
708
        if node is None:
709
            node = self.node.node_create(parent, path)
710
        return path, node
711
    
712
    def _put_path(self, user, parent, path):
713
        node = self.node.node_create(parent, path)
714
        self.node.version_create(node, None, 0, None, user, CLUSTER_NORMAL)
715
        return node
716
    
717
    def _lookup_account(self, account, create=True):
718
        node = self.node.node_lookup(account)
719
        if node is None and create:
720
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
721
        return account, node
722
    
723
    def _lookup_container(self, account, container):
724
        path = '/'.join((account, container))
725
        node = self.node.node_lookup(path)
726
        if node is None:
727
            raise NameError('Container does not exist')
728
        return path, node
729
    
730
    def _lookup_object(self, account, container, name):
731
        path = '/'.join((account, container, name))
732
        node = self.node.node_lookup(path)
733
        if node is None:
734
            raise NameError('Object does not exist')
735
        return path, node
736
    
737
    def _get_properties(self, node, until=None):
738
        """Return properties until the timestamp given."""
739
        
740
        before = until if until is not None else inf
741
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
742
        if props is None and until is not None:
743
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
744
        if props is None:
745
            raise NameError('Path does not exist')
746
        return props
747
    
748
    def _get_statistics(self, node, until=None):
749
        """Return count, sum of size and latest timestamp of everything under node."""
750
        
751
        if until is None:
752
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
753
        else:
754
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
755
        if stats is None:
756
            stats = (0, 0, 0)
757
        return stats
758
    
759
    def _get_version(self, node, version=None):
760
        if version is None:
761
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
762
            if props is None:
763
                raise NameError('Object does not exist')
764
        else:
765
            try:
766
                version = int(version)
767
            except ValueError:
768
                raise IndexError('Version does not exist')
769
            props = self.node.version_get_properties(version)
770
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
771
                raise IndexError('Version does not exist')
772
        return props
773
    
774
    def _put_version_duplicate(self, user, node, size=None, hash=None, cluster=CLUSTER_NORMAL):
775
        """Create a new version of the node."""
776
        
777
        props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
778
        if props is not None:
779
            src_version_id = props[self.SERIAL]
780
            src_hash = props[self.HASH]
781
            src_size = props[self.SIZE]
782
        else:
783
            src_version_id = None
784
            src_hash = None
785
            src_size = 0
786
        if size is None:
787
            hash = src_hash # This way hash can be set to None.
788
            size = src_size
789
        
790
        if src_version_id is not None:
791
            self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
792
        dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, cluster)
793
        return src_version_id, dest_version_id
794
    
795
    def _put_metadata(self, user, node, meta, replace=False):
796
        """Create a new version and store metadata."""
797
        
798
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
799
        
800
        # TODO: Merge with other functions that update metadata...
801
        if not replace:
802
            if src_version_id is not None:
803
                self.node.attribute_copy(src_version_id, dest_version_id)
804
            self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
805
            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
806
        else:
807
            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
808
        return src_version_id, dest_version_id
809
    
810
    def _list_limits(self, listing, marker, limit):
811
        start = 0
812
        if marker:
813
            try:
814
                start = listing.index(marker) + 1
815
            except ValueError:
816
                pass
817
        if not limit or limit > 10000:
818
            limit = 10000
819
        return start, limit
820
    
821
    def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
822
        cont_prefix = path + '/'
823
        prefix = cont_prefix + prefix
824
        start = cont_prefix + marker if marker else None
825
        before = until if until is not None else inf
826
        filterq = ','.join(keys) if keys else None
827
        
828
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, filterq)
829
        objects.extend([(p, None) for p in prefixes] if virtual else [])
830
        objects.sort(key=lambda x: x[0])
831
        objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
832
        
833
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
834
        return objects[start:start + limit]
835
    
836
    # Policy functions.
837
    
838
    def _check_policy(self, policy):
839
        for k in policy.keys():
840
            if policy[k] == '':
841
                policy[k] = self.default_policy.get(k)
842
        for k, v in policy.iteritems():
843
            if k == 'quota':
844
                q = int(v) # May raise ValueError.
845
                if q < 0:
846
                    raise ValueError
847
            elif k == 'versioning':
848
                if v not in ['auto', 'none']:
849
                    raise ValueError
850
            else:
851
                raise ValueError
852
    
853
    def _put_policy(self, node, policy, replace):
854
        if replace:
855
            for k, v in self.default_policy.iteritems():
856
                if k not in policy:
857
                    policy[k] = v
858
        self.node.policy_set(node, policy)
859
    
860
    def _get_policy(self, node):
861
        policy = self.default_policy.copy()
862
        policy.update(self.node.policy_get(node))
863
        return policy
864
    
865
    def _apply_versioning(self, account, container, version_id):
866
        if version_id is None:
867
            return
868
        path, node = self._lookup_container(account, container)
869
        versioning = self._get_policy(node)['versioning']
870
        if versioning != 'auto':
871
            self.node.version_remove(version_id)
872
    
873
    # Access control functions.
874
    
875
    def _check_groups(self, groups):
876
        # raise ValueError('Bad characters in groups')
877
        pass
878
    
879
    def _check_permissions(self, path, permissions):
880
        # raise ValueError('Bad characters in permissions')
881
        
882
        # Check for existing permissions.
883
        paths = self.permissions.access_list(path)
884
        if paths:
885
            ae = AttributeError()
886
            ae.data = paths
887
            raise ae
888
    
889
    def _can_read(self, user, account, container, name):
890
        if user == account:
891
            return True
892
        path = '/'.join((account, container, name))
893
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
894
            raise NotAllowedError
895
    
896
    def _can_write(self, user, account, container, name):
897
        if user == account:
898
            return True
899
        path = '/'.join((account, container, name))
900
        if not self.permissions.access_check(path, self.WRITE, user):
901
            raise NotAllowedError
902
    
903
    def _allowed_accounts(self, user):
904
        allow = set()
905
        for path in self.permissions.access_list_paths(user):
906
            allow.add(path.split('/', 1)[0])
907
        return sorted(allow)
908
    
909
    def _allowed_containers(self, user, account):
910
        allow = set()
911
        for path in self.permissions.access_list_paths(user, account):
912
            allow.add(path.split('/', 2)[1])
913
        return sorted(allow)