Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / modular.py @ 3d13f97a

History | View | Annotate | Download (37.4 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import logging
38
import binascii
39

    
40
from base import NotAllowedError, QuotaError, BaseBackend
41

    
42
from pithos.lib.hashmap import HashMap
43

    
44
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
45

    
46
inf = float('inf')
47

    
48
ULTIMATE_ANSWER = 42
49

    
50

    
51
logger = logging.getLogger(__name__)
52

    
53

    
54
def backend_method(func=None, autocommit=1):
55
    if func is None:
56
        def fn(func):
57
            return backend_method(func, autocommit)
58
        return fn
59

    
60
    if not autocommit:
61
        return func
62
    def fn(self, *args, **kw):
63
        self.wrapper.execute()
64
        try:
65
            ret = func(self, *args, **kw)
66
            self.wrapper.commit()
67
            return ret
68
        except:
69
            self.wrapper.rollback()
70
            raise
71
    return fn
72

    
73

    
74
class ModularBackend(BaseBackend):
75
    """A modular backend.
76
    
77
    Uses modules for SQL functions and storage.
78
    """
79
    
80
    def __init__(self, db_module, db_connection, block_module, block_path):
81
        db_module = db_module or 'pithos.backends.lib.sqlalchemy'
82
        block_module = block_module or 'pithos.backends.lib.hashfiler'
83
        
84
        self.hash_algorithm = 'sha256'
85
        self.block_size = 4 * 1024 * 1024 # 4MB
86
        
87
        self.default_policy = {'quota': 0, 'versioning': 'auto'}
88
        
89
        __import__(db_module)
90
        self.db_module = sys.modules[db_module]
91
        self.wrapper = self.db_module.DBWrapper(db_connection)
92
        
93
        params = {'wrapper': self.wrapper}
94
        self.permissions = self.db_module.Permissions(**params)
95
        for x in ['READ', 'WRITE']:
96
            setattr(self, x, getattr(self.db_module, x))
97
        self.node = self.db_module.Node(**params)
98
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
99
            setattr(self, x, getattr(self.db_module, x))
100
        
101
        __import__(block_module)
102
        self.block_module = sys.modules[block_module]
103
        
104
        params = {'path': block_path,
105
                  'block_size': self.block_size,
106
                  'hash_algorithm': self.hash_algorithm}
107
        self.store = self.block_module.Store(**params)
108
    
109
    def close(self):
110
        self.wrapper.close()
111
    
112
    @backend_method
113
    def list_accounts(self, user, marker=None, limit=10000):
114
        """Return a list of accounts the user can access."""
115
        
116
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
117
        allowed = self._allowed_accounts(user)
118
        start, limit = self._list_limits(allowed, marker, limit)
119
        return allowed[start:start + limit]
120
    
121
    @backend_method
122
    def get_account_meta(self, user, account, until=None):
123
        """Return a dictionary with the account metadata."""
124
        
125
        logger.debug("get_account_meta: %s %s", account, until)
126
        path, node = self._lookup_account(account, user == account)
127
        if user != account:
128
            if until or node is None or account not in self._allowed_accounts(user):
129
                raise NotAllowedError
130
        try:
131
            props = self._get_properties(node, until)
132
            mtime = props[self.MTIME]
133
        except NameError:
134
            props = None
135
            mtime = until
136
        count, bytes, tstamp = self._get_statistics(node, until)
137
        tstamp = max(tstamp, mtime)
138
        if until is None:
139
            modified = tstamp
140
        else:
141
            modified = self._get_statistics(node)[2] # Overall last modification.
142
            modified = max(modified, mtime)
143
        
144
        if user != account:
145
            meta = {'name': account}
146
        else:
147
            meta = {}
148
            if props is not None:
149
                meta.update(dict(self.node.attribute_get(props[self.SERIAL])))
150
            if until is not None:
151
                meta.update({'until_timestamp': tstamp})
152
            meta.update({'name': account, 'count': count, 'bytes': bytes})
153
        meta.update({'modified': modified})
154
        return meta
155
    
156
    @backend_method
157
    def update_account_meta(self, user, account, meta, replace=False):
158
        """Update the metadata associated with the account."""
159
        
160
        logger.debug("update_account_meta: %s %s %s", account, meta, replace)
161
        if user != account:
162
            raise NotAllowedError
163
        path, node = self._lookup_account(account, True)
164
        self._put_metadata(user, node, meta, replace)
165
    
166
    @backend_method
167
    def get_account_groups(self, user, account):
168
        """Return a dictionary with the user groups defined for this account."""
169
        
170
        logger.debug("get_account_groups: %s", account)
171
        if user != account:
172
            if account not in self._allowed_accounts(user):
173
                raise NotAllowedError
174
            return {}
175
        self._lookup_account(account, True)
176
        return self.permissions.group_dict(account)
177
    
178
    @backend_method
179
    def update_account_groups(self, user, account, groups, replace=False):
180
        """Update the groups associated with the account."""
181
        
182
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
183
        if user != account:
184
            raise NotAllowedError
185
        self._lookup_account(account, True)
186
        self._check_groups(groups)
187
        if replace:
188
            self.permissions.group_destroy(account)
189
        for k, v in groups.iteritems():
190
            if not replace: # If not already deleted.
191
                self.permissions.group_delete(account, k)
192
            if v:
193
                self.permissions.group_addmany(account, k, v)
194
    
195
    @backend_method
196
    def get_account_policy(self, user, account):
197
        """Return a dictionary with the account policy."""
198
        
199
        logger.debug("get_account_policy: %s", account)
200
        if user != account:
201
            if account not in self._allowed_accounts(user):
202
                raise NotAllowedError
203
            return {}
204
        path, node = self._lookup_account(account, True)
205
        return self._get_policy(node)
206
    
207
    @backend_method
208
    def update_account_policy(self, user, account, policy, replace=False):
209
        """Update the policy associated with the account."""
210
        
211
        logger.debug("update_account_policy: %s %s %s", account, policy, replace)
212
        if user != account:
213
            raise NotAllowedError
214
        path, node = self._lookup_account(account, True)
215
        self._check_policy(policy)
216
        self._put_policy(node, policy, replace)
217
    
218
    @backend_method
219
    def put_account(self, user, account, policy={}):
220
        """Create a new account with the given name."""
221
        
222
        logger.debug("put_account: %s %s", account, policy)
223
        if user != account:
224
            raise NotAllowedError
225
        node = self.node.node_lookup(account)
226
        if node is not None:
227
            raise NameError('Account already exists')
228
        if policy:
229
            self._check_policy(policy)
230
        node = self._put_path(user, self.ROOTNODE, account)
231
        self._put_policy(node, policy, True)
232
    
233
    @backend_method
234
    def delete_account(self, user, account):
235
        """Delete the account with the given name."""
236
        
237
        logger.debug("delete_account: %s", account)
238
        if user != account:
239
            raise NotAllowedError
240
        node = self.node.node_lookup(account)
241
        if node is None:
242
            return
243
        if not self.node.node_remove(node):
244
            raise IndexError('Account is not empty')
245
        self.permissions.group_destroy(account)
246
    
247
    @backend_method
248
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
249
        """Return a list of containers existing under an account."""
250
        
251
        logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
252
        if user != account:
253
            if until or account not in self._allowed_accounts(user):
254
                raise NotAllowedError
255
            allowed = self._allowed_containers(user, account)
256
            start, limit = self._list_limits(allowed, marker, limit)
257
            return allowed[start:start + limit]
258
        if shared:
259
            allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
260
            allowed = list(set(allowed))
261
            start, limit = self._list_limits(allowed, marker, limit)
262
            return allowed[start:start + limit]
263
        node = self.node.node_lookup(account)
264
        return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
265
    
266
    @backend_method
267
    def get_container_meta(self, user, account, container, until=None):
268
        """Return a dictionary with the container metadata."""
269
        
270
        logger.debug("get_container_meta: %s %s %s", account, container, until)
271
        if user != account:
272
            if until or container not in self._allowed_containers(user, account):
273
                raise NotAllowedError
274
        path, node = self._lookup_container(account, container)
275
        props = self._get_properties(node, until)
276
        mtime = props[self.MTIME]
277
        count, bytes, tstamp = self._get_statistics(node, until)
278
        tstamp = max(tstamp, mtime)
279
        if until is None:
280
            modified = tstamp
281
        else:
282
            modified = self._get_statistics(node)[2] # Overall last modification.
283
            modified = max(modified, mtime)
284
        
285
        if user != account:
286
            meta = {'name': container}
287
        else:
288
            meta = dict(self.node.attribute_get(props[self.SERIAL]))
289
            if until is not None:
290
                meta.update({'until_timestamp': tstamp})
291
            meta.update({'name': container, 'count': count, 'bytes': bytes})
292
        meta.update({'modified': modified})
293
        return meta
294
    
295
    @backend_method
296
    def update_container_meta(self, user, account, container, meta, replace=False):
297
        """Update the metadata associated with the container."""
298
        
299
        logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
300
        if user != account:
301
            raise NotAllowedError
302
        path, node = self._lookup_container(account, container)
303
        self._put_metadata(user, node, meta, replace)
304
    
305
    @backend_method
306
    def get_container_policy(self, user, account, container):
307
        """Return a dictionary with the container policy."""
308
        
309
        logger.debug("get_container_policy: %s %s", account, container)
310
        if user != account:
311
            if container not in self._allowed_containers(user, account):
312
                raise NotAllowedError
313
            return {}
314
        path, node = self._lookup_container(account, container)
315
        return self._get_policy(node)
316
    
317
    @backend_method
318
    def update_container_policy(self, user, account, container, policy, replace=False):
319
        """Update the policy associated with the container."""
320
        
321
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
322
        if user != account:
323
            raise NotAllowedError
324
        path, node = self._lookup_container(account, container)
325
        self._check_policy(policy)
326
        self._put_policy(node, policy, replace)
327
    
328
    @backend_method
329
    def put_container(self, user, account, container, policy={}):
330
        """Create a new container with the given name."""
331
        
332
        logger.debug("put_container: %s %s %s", account, container, policy)
333
        if user != account:
334
            raise NotAllowedError
335
        try:
336
            path, node = self._lookup_container(account, container)
337
        except NameError:
338
            pass
339
        else:
340
            raise NameError('Container already exists')
341
        if policy:
342
            self._check_policy(policy)
343
        path = '/'.join((account, container))
344
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
345
        self._put_policy(node, policy, True)
346
    
347
    @backend_method
348
    def delete_container(self, user, account, container, until=None):
349
        """Delete/purge the container with the given name."""
350
        
351
        logger.debug("delete_container: %s %s %s", account, container, until)
352
        if user != account:
353
            raise NotAllowedError
354
        path, node = self._lookup_container(account, container)
355
        
356
        if until is not None:
357
            hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
358
            for h in hashes:
359
                self.store.map_delete(h)
360
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
361
            return
362
        
363
        if self._get_statistics(node)[0] > 0:
364
            raise IndexError('Container is not empty')
365
        hashes = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
366
        for h in hashes:
367
            self.store.map_delete(h)
368
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
369
        self.node.node_remove(node)
370
    
371
    @backend_method
372
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
373
        """Return a list of objects existing under a container."""
374
        
375
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
376
        allowed = []
377
        if user != account:
378
            if until:
379
                raise NotAllowedError
380
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
381
            if not allowed:
382
                raise NotAllowedError
383
        else:
384
            if shared:
385
                allowed = self.permissions.access_list_shared('/'.join((account, container)))
386
                if not allowed:
387
                    return []
388
        path, node = self._lookup_container(account, container)
389
        return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
390
    
391
    @backend_method
392
    def list_object_meta(self, user, account, container, until=None):
393
        """Return a list with all the container's object meta keys."""
394
        
395
        logger.debug("list_object_meta: %s %s %s", account, container, until)
396
        allowed = []
397
        if user != account:
398
            if until:
399
                raise NotAllowedError
400
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
401
            if not allowed:
402
                raise NotAllowedError
403
        path, node = self._lookup_container(account, container)
404
        before = until if until is not None else inf
405
        return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
406
    
407
    @backend_method
408
    def get_object_meta(self, user, account, container, name, version=None):
409
        """Return a dictionary with the object metadata."""
410
        
411
        logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
412
        self._can_read(user, account, container, name)
413
        path, node = self._lookup_object(account, container, name)
414
        props = self._get_version(node, version)
415
        if version is None:
416
            modified = props[self.MTIME]
417
        else:
418
            try:
419
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
420
            except NameError: # Object may be deleted.
421
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
422
                if del_props is None:
423
                    raise NameError('Object does not exist')
424
                modified = del_props[self.MTIME]
425
        
426
        meta = dict(self.node.attribute_get(props[self.SERIAL]))
427
        meta.update({'name': name, 'bytes': props[self.SIZE], 'hash':props[self.HASH]})
428
        meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
429
        meta.update({'modified': modified, 'modified_by': props[self.MUSER]})
430
        return meta
431
    
432
    @backend_method
433
    def update_object_meta(self, user, account, container, name, meta, replace=False):
434
        """Update the metadata associated with the object."""
435
        
436
        logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
437
        self._can_write(user, account, container, name)
438
        path, node = self._lookup_object(account, container, name)
439
        src_version_id, dest_version_id = self._put_metadata(user, node, meta, replace)
440
        self._apply_versioning(account, container, src_version_id)
441
        return dest_version_id
442
    
443
    @backend_method
444
    def get_object_permissions(self, user, account, container, name):
445
        """Return the action allowed on the object, the path
446
        from which the object gets its permissions from,
447
        along with a dictionary containing the permissions."""
448
        
449
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
450
        allowed = 'write'
451
        if user != account:
452
            path = '/'.join((account, container, name))
453
            if self.permissions.access_check(path, self.WRITE, user):
454
                allowed = 'write'
455
            elif self.permissions.access_check(path, self.READ, user):
456
                allowed = 'read'
457
            else:
458
                raise NotAllowedError
459
        path = self._lookup_object(account, container, name)[0]
460
        return (allowed,) + self.permissions.access_inherit(path)
461
    
462
    @backend_method
463
    def update_object_permissions(self, user, account, container, name, permissions):
464
        """Update the permissions associated with the object."""
465
        
466
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
467
        if user != account:
468
            raise NotAllowedError
469
        path = self._lookup_object(account, container, name)[0]
470
        self._check_permissions(path, permissions)
471
        self.permissions.access_set(path, permissions)
472
    
473
    @backend_method
474
    def get_object_public(self, user, account, container, name):
475
        """Return the public id of the object if applicable."""
476
        
477
        logger.debug("get_object_public: %s %s %s", account, container, name)
478
        self._can_read(user, account, container, name)
479
        path = self._lookup_object(account, container, name)[0]
480
        p = self.permissions.public_get(path)
481
        if p is not None:
482
            p += ULTIMATE_ANSWER
483
        return p
484
    
485
    @backend_method
486
    def update_object_public(self, user, account, container, name, public):
487
        """Update the public status of the object."""
488
        
489
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
490
        self._can_write(user, account, container, name)
491
        path = self._lookup_object(account, container, name)[0]
492
        if not public:
493
            self.permissions.public_unset(path)
494
        else:
495
            self.permissions.public_set(path)
496
    
497
    @backend_method
498
    def get_object_hashmap(self, user, account, container, name, version=None):
499
        """Return the object's size and a list with partial hashes."""
500
        
501
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
502
        self._can_read(user, account, container, name)
503
        path, node = self._lookup_object(account, container, name)
504
        props = self._get_version(node, version)
505
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
506
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
507
    
508
    def _update_object_hash(self, user, account, container, name, size, hash, meta={}, replace_meta=False, permissions=None):
509
        if permissions is not None and user != account:
510
            raise NotAllowedError
511
        self._can_write(user, account, container, name)
512
        if permissions is not None:
513
            path = '/'.join((account, container, name))
514
            self._check_permissions(path, permissions)
515
        
516
        account_path, account_node = self._lookup_account(account, True)
517
        container_path, container_node = self._lookup_container(account, container)
518
        path, node = self._put_object_node(container_path, container_node, name)
519
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size, hash)
520
        
521
        # Check quota.
522
        size_delta = size # Change with versioning.
523
        if size_delta > 0:
524
            account_quota = long(self._get_policy(account_node)['quota'])
525
            container_quota = long(self._get_policy(container_node)['quota'])
526
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
527
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
528
                # This must be executed in a transaction, so the version is never created if it fails.
529
                raise QuotaError
530
        
531
        if not replace_meta and src_version_id is not None:
532
            self.node.attribute_copy(src_version_id, dest_version_id)
533
        self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
534
        if permissions is not None:
535
            self.permissions.access_set(path, permissions)
536
        self._apply_versioning(account, container, src_version_id)
537
        return dest_version_id
538
    
539
    @backend_method
540
    def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
541
        """Create/update an object with the specified size and partial hashes."""
542
        
543
        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
544
        if size == 0: # No such thing as an empty hashmap.
545
            hashmap = [self.put_block('')]
546
        map = HashMap(self.block_size, self.hash_algorithm)
547
        map.extend([binascii.unhexlify(x) for x in hashmap])
548
        missing = self.store.block_search(map)
549
        if missing:
550
            ie = IndexError()
551
            ie.data = [binascii.hexlify(x) for x in missing]
552
            raise ie
553
        
554
        hash = map.hash()
555
        dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), meta, replace_meta, permissions)
556
        self.store.map_put(hash, map)
557
        return dest_version_id
558
    
559
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
560
        self._can_read(user, src_account, src_container, src_name)
561
        path, node = self._lookup_object(src_account, src_container, src_name)
562
        props = self._get_version(node, src_version)
563
        src_version_id = props[self.SERIAL]
564
        hash = props[self.HASH]
565
        size = props[self.SIZE]
566
        
567
        if (src_account, src_container, src_name) == (dest_account, dest_container, dest_name):
568
            dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, dest_meta, replace_meta, permissions)
569
        else:
570
            if replace_meta:
571
                meta = dest_meta
572
            else:
573
                meta = {}
574
            dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, meta, True, permissions)
575
            if not replace_meta:
576
                self.node.attribute_copy(src_version_id, dest_version_id)
577
                self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
578
        return dest_version_id
579
    
580
    @backend_method
581
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
582
        """Copy an object's data and metadata."""
583
        
584
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
585
        return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
586
    
587
    @backend_method
588
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
589
        """Move an object's data and metadata."""
590
        
591
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions)
592
        if user != src_account:
593
            raise NotAllowedError
594
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
595
        if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
596
            self._delete_object(user, src_account, src_container, src_name)
597
        return dest_version_id
598
    
599
    def _delete_object(self, user, account, container, name, until=None):
600
        if user != account:
601
            raise NotAllowedError
602
        
603
        if until is not None:
604
            path = '/'.join((account, container, name))
605
            node = self.node.node_lookup(path)
606
            if node is None:
607
                return
608
            hashes = self.node.node_purge(node, until, CLUSTER_NORMAL)
609
            hashes += self.node.node_purge(node, until, CLUSTER_HISTORY)
610
            for h in hashes:
611
                self.store.map_delete(h)
612
            self.node.node_purge(node, until, CLUSTER_DELETED)
613
            try:
614
                props = self._get_version(node)
615
            except NameError:
616
                self.permissions.access_clear(path)
617
            return
618
        
619
        path, node = self._lookup_object(account, container, name)
620
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, 0, None, CLUSTER_DELETED)
621
        self._apply_versioning(account, container, src_version_id)
622
        self.permissions.access_clear(path)
623
    
624
    @backend_method
625
    def delete_object(self, user, account, container, name, until=None):
626
        """Delete/purge an object."""
627
        
628
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
629
        self._delete_object(user, account, container, name, until)
630
    
631
    @backend_method
632
    def list_versions(self, user, account, container, name):
633
        """Return a list of all (version, version_timestamp) tuples for an object."""
634
        
635
        logger.debug("list_versions: %s %s %s", account, container, name)
636
        self._can_read(user, account, container, name)
637
        path, node = self._lookup_object(account, container, name)
638
        versions = self.node.node_get_versions(node)
639
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
640
    
641
    @backend_method
642
    def get_public(self, user, public):
643
        """Return the (account, container, name) for the public id given."""
644
        logger.debug("get_public: %s", public)
645
        if public is None or public < ULTIMATE_ANSWER:
646
            raise NameError
647
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
648
        account, container, name = path.split('/', 2)
649
        self._can_read(user, account, container, name)
650
        return (account, container, name)
651
    
652
    @backend_method(autocommit=0)
653
    def get_block(self, hash):
654
        """Return a block's data."""
655
        
656
        logger.debug("get_block: %s", hash)
657
        block = self.store.block_get(binascii.unhexlify(hash))
658
        if not block:
659
            raise NameError('Block does not exist')
660
        return block
661
    
662
    @backend_method(autocommit=0)
663
    def put_block(self, data):
664
        """Store a block and return the hash."""
665
        
666
        logger.debug("put_block: %s", len(data))
667
        return binascii.hexlify(self.store.block_put(data))
668
    
669
    @backend_method(autocommit=0)
670
    def update_block(self, hash, data, offset=0):
671
        """Update a known block and return the hash."""
672
        
673
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
674
        if offset == 0 and len(data) == self.block_size:
675
            return self.put_block(data)
676
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
677
        return binascii.hexlify(h)
678
    
679
    # Path functions.
680
    
681
    def _put_object_node(self, path, parent, name):
682
        path = '/'.join((path, name))
683
        node = self.node.node_lookup(path)
684
        if node is None:
685
            node = self.node.node_create(parent, path)
686
        return path, node
687
    
688
    def _put_path(self, user, parent, path):
689
        node = self.node.node_create(parent, path)
690
        self.node.version_create(node, None, 0, None, user, CLUSTER_NORMAL)
691
        return node
692
    
693
    def _lookup_account(self, account, create=True):
694
        node = self.node.node_lookup(account)
695
        if node is None and create:
696
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
697
        return account, node
698
    
699
    def _lookup_container(self, account, container):
700
        path = '/'.join((account, container))
701
        node = self.node.node_lookup(path)
702
        if node is None:
703
            raise NameError('Container does not exist')
704
        return path, node
705
    
706
    def _lookup_object(self, account, container, name):
707
        path = '/'.join((account, container, name))
708
        node = self.node.node_lookup(path)
709
        if node is None:
710
            raise NameError('Object does not exist')
711
        return path, node
712
    
713
    def _get_properties(self, node, until=None):
714
        """Return properties until the timestamp given."""
715
        
716
        before = until if until is not None else inf
717
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
718
        if props is None and until is not None:
719
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
720
        if props is None:
721
            raise NameError('Path does not exist')
722
        return props
723
    
724
    def _get_statistics(self, node, until=None):
725
        """Return count, sum of size and latest timestamp of everything under node."""
726
        
727
        if until is None:
728
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
729
        else:
730
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
731
        if stats is None:
732
            stats = (0, 0, 0)
733
        return stats
734
    
735
    def _get_version(self, node, version=None):
736
        if version is None:
737
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
738
            if props is None:
739
                raise NameError('Object does not exist')
740
        else:
741
            try:
742
                version = int(version)
743
            except ValueError:
744
                raise IndexError('Version does not exist')
745
            props = self.node.version_get_properties(version)
746
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
747
                raise IndexError('Version does not exist')
748
        return props
749
    
750
    def _put_version_duplicate(self, user, node, size=None, hash=None, cluster=CLUSTER_NORMAL):
751
        """Create a new version of the node."""
752
        
753
        props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
754
        if props is not None:
755
            src_version_id = props[self.SERIAL]
756
            src_hash = props[self.HASH]
757
            src_size = props[self.SIZE]
758
        else:
759
            src_version_id = None
760
            src_hash = None
761
            src_size = 0
762
        if size is None:
763
            hash = src_hash # This way hash can be set to None.
764
            size = src_size
765
        
766
        if src_version_id is not None:
767
            self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
768
        dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, cluster)
769
        return src_version_id, dest_version_id
770
    
771
    def _put_metadata(self, user, node, meta, replace=False):
772
        """Create a new version and store metadata."""
773
        
774
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
775
        
776
        # TODO: Merge with other functions that update metadata...
777
        if not replace:
778
            if src_version_id is not None:
779
                self.node.attribute_copy(src_version_id, dest_version_id)
780
            self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
781
            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
782
        else:
783
            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
784
        return src_version_id, dest_version_id
785
    
786
    def _list_limits(self, listing, marker, limit):
787
        start = 0
788
        if marker:
789
            try:
790
                start = listing.index(marker) + 1
791
            except ValueError:
792
                pass
793
        if not limit or limit > 10000:
794
            limit = 10000
795
        return start, limit
796
    
797
    def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
798
        cont_prefix = path + '/'
799
        prefix = cont_prefix + prefix
800
        start = cont_prefix + marker if marker else None
801
        before = until if until is not None else inf
802
        
803
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, keys)
804
        objects.extend([(p, None) for p in prefixes] if virtual else [])
805
        objects.sort(key=lambda x: x[0])
806
        objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
807
        
808
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
809
        return objects[start:start + limit]
810
    
811
    # Policy functions.
812
    
813
    def _check_policy(self, policy):
814
        for k in policy.keys():
815
            if policy[k] == '':
816
                policy[k] = self.default_policy.get(k)
817
        for k, v in policy.iteritems():
818
            if k == 'quota':
819
                q = int(v) # May raise ValueError.
820
                if q < 0:
821
                    raise ValueError
822
            elif k == 'versioning':
823
                if v not in ['auto', 'none']:
824
                    raise ValueError
825
            else:
826
                raise ValueError
827
    
828
    def _put_policy(self, node, policy, replace):
829
        if replace:
830
            for k, v in self.default_policy.iteritems():
831
                if k not in policy:
832
                    policy[k] = v
833
        self.node.policy_set(node, policy)
834
    
835
    def _get_policy(self, node):
836
        policy = self.default_policy.copy()
837
        policy.update(self.node.policy_get(node))
838
        return policy
839
    
840
    def _apply_versioning(self, account, container, version_id):
841
        if version_id is None:
842
            return
843
        path, node = self._lookup_container(account, container)
844
        versioning = self._get_policy(node)['versioning']
845
        if versioning != 'auto':
846
            hash = self.node.version_remove(version_id)
847
            self.store.map_delete(hash)
848
    
849
    # Access control functions.
850
    
851
    def _check_groups(self, groups):
852
        # raise ValueError('Bad characters in groups')
853
        pass
854
    
855
    def _check_permissions(self, path, permissions):
856
        # raise ValueError('Bad characters in permissions')
857
        
858
        # Check for existing permissions.
859
        paths = self.permissions.access_list(path)
860
        if paths:
861
            ae = AttributeError()
862
            ae.data = paths
863
            raise ae
864
    
865
    def _can_read(self, user, account, container, name):
866
        if user == account:
867
            return True
868
        path = '/'.join((account, container, name))
869
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
870
            raise NotAllowedError
871
    
872
    def _can_write(self, user, account, container, name):
873
        if user == account:
874
            return True
875
        path = '/'.join((account, container, name))
876
        if not self.permissions.access_check(path, self.WRITE, user):
877
            raise NotAllowedError
878
    
879
    def _allowed_accounts(self, user):
880
        allow = set()
881
        for path in self.permissions.access_list_paths(user):
882
            allow.add(path.split('/', 1)[0])
883
        return sorted(allow)
884
    
885
    def _allowed_containers(self, user, account):
886
        allow = set()
887
        for path in self.permissions.access_list_paths(user, account):
888
            allow.add(path.split('/', 2)[1])
889
        return sorted(allow)