Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / modular.py @ 7ca7bb08

History | View | Annotate | Download (36.2 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import logging
38
import hashlib
39
import binascii
40

    
41
from base import NotAllowedError, QuotaError, BaseBackend
42

    
43
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
44

    
45
inf = float('inf')
46

    
47

    
48
logger = logging.getLogger(__name__)
49

    
50

    
51
class HashMap(list):
52
    
53
    def __init__(self, blocksize, blockhash):
54
        super(HashMap, self).__init__()
55
        self.blocksize = blocksize
56
        self.blockhash = blockhash
57
    
58
    def _hash_raw(self, v):
59
        h = hashlib.new(self.blockhash)
60
        h.update(v)
61
        return h.digest()
62
    
63
    def hash(self):
64
        if len(self) == 0:
65
            return self._hash_raw('')
66
        if len(self) == 1:
67
            return self.__getitem__(0)
68
        
69
        h = list(self)
70
        s = 2
71
        while s < len(h):
72
            s = s * 2
73
        h += [('\x00' * len(h[0]))] * (s - len(h))
74
        while len(h) > 1:
75
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
76
        return h[0]
77

    
78

    
79
def backend_method(func=None, autocommit=1):
80
    if func is None:
81
        def fn(func):
82
            return backend_method(func, autocommit)
83
        return fn
84

    
85
    if not autocommit:
86
        return func
87
    def fn(self, *args, **kw):
88
        self.wrapper.execute()
89
        try:
90
            ret = func(self, *args, **kw)
91
            self.wrapper.commit()
92
            return ret
93
        except:
94
            self.wrapper.rollback()
95
            raise
96
    return fn
97

    
98

    
99
class ModularBackend(BaseBackend):
100
    """A modular backend.
101
    
102
    Uses modules for SQL functions and storage.
103
    """
104
    
105
    def __init__(self, db_module, db_connection, block_module, block_path):
106
        self.hash_algorithm = 'sha256'
107
        self.block_size = 4 * 1024 * 1024 # 4MB
108
        
109
        self.default_policy = {'quota': 0, 'versioning': 'manual'}
110
        
111
        __import__(db_module)
112
        self.db_module = sys.modules[db_module]
113
        self.wrapper = self.db_module.DBWrapper(db_connection)
114
        
115
        params = {'wrapper': self.wrapper}
116
        self.permissions = self.db_module.Permissions(**params)
117
        for x in ['READ', 'WRITE']:
118
            setattr(self, x, getattr(self.db_module, x))
119
        self.node = self.db_module.Node(**params)
120
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
121
            setattr(self, x, getattr(self.db_module, x))
122
        
123
        __import__(block_module)
124
        self.block_module = sys.modules[block_module]
125
        
126
        params = {'path': block_path,
127
                  'block_size': self.block_size,
128
                  'hash_algorithm': self.hash_algorithm}
129
        self.store = self.block_module.Store(**params)
130
    
131
    def close(self):
132
        self.wrapper.close()
133
    
134
    @backend_method
135
    def list_accounts(self, user, marker=None, limit=10000):
136
        """Return a list of accounts the user can access."""
137
        
138
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
139
        allowed = self._allowed_accounts(user)
140
        start, limit = self._list_limits(allowed, marker, limit)
141
        return allowed[start:start + limit]
142
    
143
    @backend_method
144
    def get_account_meta(self, user, account, until=None):
145
        """Return a dictionary with the account metadata."""
146
        
147
        logger.debug("get_account_meta: %s %s", account, until)
148
        path, node = self._lookup_account(account, user == account)
149
        if user != account:
150
            if until or node is None or account not in self._allowed_accounts(user):
151
                raise NotAllowedError
152
        try:
153
            props = self._get_properties(node, until)
154
            mtime = props[self.MTIME]
155
        except NameError:
156
            props = None
157
            mtime = until
158
        count, bytes, tstamp = self._get_statistics(node, until)
159
        tstamp = max(tstamp, mtime)
160
        if until is None:
161
            modified = tstamp
162
        else:
163
            modified = self._get_statistics(node)[2] # Overall last modification.
164
            modified = max(modified, mtime)
165
        
166
        if user != account:
167
            meta = {'name': account}
168
        else:
169
            meta = {}
170
            if props is not None:
171
                meta.update(dict(self.node.attribute_get(props[self.SERIAL])))
172
            if until is not None:
173
                meta.update({'until_timestamp': tstamp})
174
            meta.update({'name': account, 'count': count, 'bytes': bytes})
175
        meta.update({'modified': modified})
176
        return meta
177
    
178
    @backend_method
179
    def update_account_meta(self, user, account, meta, replace=False):
180
        """Update the metadata associated with the account."""
181
        
182
        logger.debug("update_account_meta: %s %s %s", account, meta, replace)
183
        if user != account:
184
            raise NotAllowedError
185
        path, node = self._lookup_account(account, True)
186
        self._put_metadata(user, node, meta, replace)
187
    
188
    @backend_method
189
    def get_account_groups(self, user, account):
190
        """Return a dictionary with the user groups defined for this account."""
191
        
192
        logger.debug("get_account_groups: %s", account)
193
        if user != account:
194
            if account not in self._allowed_accounts(user):
195
                raise NotAllowedError
196
            return {}
197
        self._lookup_account(account, True)
198
        return self.permissions.group_dict(account)
199
    
200
    @backend_method
201
    def update_account_groups(self, user, account, groups, replace=False):
202
        """Update the groups associated with the account."""
203
        
204
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
205
        if user != account:
206
            raise NotAllowedError
207
        self._lookup_account(account, True)
208
        self._check_groups(groups)
209
        if replace:
210
            self.permissions.group_destroy(account)
211
        for k, v in groups.iteritems():
212
            if not replace: # If not already deleted.
213
                self.permissions.group_delete(account, k)
214
            if v:
215
                self.permissions.group_addmany(account, k, v)
216
    
217
    @backend_method
218
    def get_account_policy(self, user, account):
219
        """Return a dictionary with the account policy."""
220
        
221
        logger.debug("get_account_policy: %s", account)
222
        if user != account:
223
            if account not in self._allowed_accounts(user):
224
                raise NotAllowedError
225
            return {}
226
        path, node = self._lookup_account(account, True)
227
        return self._get_policy(node)
228
    
229
    @backend_method
230
    def update_account_policy(self, user, account, policy, replace=False):
231
        """Update the policy associated with the account."""
232
        
233
        logger.debug("update_account_policy: %s %s %s", account, policy, replace)
234
        if user != account:
235
            raise NotAllowedError
236
        path, node = self._lookup_account(account, True)
237
        self._check_policy(policy)
238
        self._put_policy(node, policy, replace)
239
    
240
    @backend_method
241
    def put_account(self, user, account, policy={}):
242
        """Create a new account with the given name."""
243
        
244
        logger.debug("put_account: %s %s", account, policy)
245
        if user != account:
246
            raise NotAllowedError
247
        node = self.node.node_lookup(account)
248
        if node is not None:
249
            raise NameError('Account already exists')
250
        if policy:
251
            self._check_policy(policy)
252
        node = self._put_path(user, self.ROOTNODE, account)
253
        self._put_policy(node, policy, True)
254
    
255
    @backend_method
256
    def delete_account(self, user, account):
257
        """Delete the account with the given name."""
258
        
259
        logger.debug("delete_account: %s", account)
260
        if user != account:
261
            raise NotAllowedError
262
        node = self.node.node_lookup(account)
263
        if node is None:
264
            return
265
        if not self.node.node_remove(node):
266
            raise IndexError('Account is not empty')
267
        self.permissions.group_destroy(account)
268
    
269
    @backend_method
270
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
271
        """Return a list of containers existing under an account."""
272
        
273
        logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
274
        if user != account:
275
            if until or account not in self._allowed_accounts(user):
276
                raise NotAllowedError
277
            allowed = self._allowed_containers(user, account)
278
            start, limit = self._list_limits(allowed, marker, limit)
279
            return allowed[start:start + limit]
280
        if shared:
281
            allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
282
            allowed = list(set(allowed))
283
            start, limit = self._list_limits(allowed, marker, limit)
284
            return allowed[start:start + limit]
285
        node = self.node.node_lookup(account)
286
        return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
287
    
288
    @backend_method
289
    def get_container_meta(self, user, account, container, until=None):
290
        """Return a dictionary with the container metadata."""
291
        
292
        logger.debug("get_container_meta: %s %s %s", account, container, until)
293
        if user != account:
294
            if until or container not in self._allowed_containers(user, account):
295
                raise NotAllowedError
296
        path, node = self._lookup_container(account, container)
297
        props = self._get_properties(node, until)
298
        mtime = props[self.MTIME]
299
        count, bytes, tstamp = self._get_statistics(node, until)
300
        tstamp = max(tstamp, mtime)
301
        if until is None:
302
            modified = tstamp
303
        else:
304
            modified = self._get_statistics(node)[2] # Overall last modification.
305
            modified = max(modified, mtime)
306
        
307
        if user != account:
308
            meta = {'name': container}
309
        else:
310
            meta = dict(self.node.attribute_get(props[self.SERIAL]))
311
            if until is not None:
312
                meta.update({'until_timestamp': tstamp})
313
            meta.update({'name': container, 'count': count, 'bytes': bytes})
314
        meta.update({'modified': modified})
315
        return meta
316
    
317
    @backend_method
318
    def update_container_meta(self, user, account, container, meta, replace=False):
319
        """Update the metadata associated with the container."""
320
        
321
        logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
322
        if user != account:
323
            raise NotAllowedError
324
        path, node = self._lookup_container(account, container)
325
        self._put_metadata(user, node, meta, replace)
326
    
327
    @backend_method
328
    def get_container_policy(self, user, account, container):
329
        """Return a dictionary with the container policy."""
330
        
331
        logger.debug("get_container_policy: %s %s", account, container)
332
        if user != account:
333
            if container not in self._allowed_containers(user, account):
334
                raise NotAllowedError
335
            return {}
336
        path, node = self._lookup_container(account, container)
337
        return self._get_policy(node)
338
    
339
    @backend_method
340
    def update_container_policy(self, user, account, container, policy, replace=False):
341
        """Update the policy associated with the container."""
342
        
343
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
344
        if user != account:
345
            raise NotAllowedError
346
        path, node = self._lookup_container(account, container)
347
        self._check_policy(policy)
348
        self._put_policy(node, policy, replace)
349
    
350
    @backend_method
351
    def put_container(self, user, account, container, policy={}):
352
        """Create a new container with the given name."""
353
        
354
        logger.debug("put_container: %s %s %s", account, container, policy)
355
        if user != account:
356
            raise NotAllowedError
357
        try:
358
            path, node = self._lookup_container(account, container)
359
        except NameError:
360
            pass
361
        else:
362
            raise NameError('Container already exists')
363
        if policy:
364
            self._check_policy(policy)
365
        path = '/'.join((account, container))
366
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
367
        self._put_policy(node, policy, True)
368
    
369
    @backend_method
370
    def delete_container(self, user, account, container, until=None):
371
        """Delete/purge the container with the given name."""
372
        
373
        logger.debug("delete_container: %s %s %s", account, container, until)
374
        if user != account:
375
            raise NotAllowedError
376
        path, node = self._lookup_container(account, container)
377
        
378
        if until is not None:
379
            self.node.node_purge_children(node, until, CLUSTER_HISTORY)
380
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
381
            return
382
        
383
        if self._get_statistics(node)[0] > 0:
384
            raise IndexError('Container is not empty')
385
        self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
386
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
387
        self.node.node_remove(node)
388
    
389
    @backend_method
390
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
391
        """Return a list of objects existing under a container."""
392
        
393
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
394
        allowed = []
395
        if user != account:
396
            if until:
397
                raise NotAllowedError
398
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
399
            if not allowed:
400
                raise NotAllowedError
401
        else:
402
            if shared:
403
                allowed = self.permissions.access_list_shared('/'.join((account, container)))
404
                if not allowed:
405
                    return []
406
        path, node = self._lookup_container(account, container)
407
        return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
408
    
409
    @backend_method
410
    def list_object_meta(self, user, account, container, until=None):
411
        """Return a list with all the container's object meta keys."""
412
        
413
        logger.debug("list_object_meta: %s %s %s", account, container, until)
414
        allowed = []
415
        if user != account:
416
            if until:
417
                raise NotAllowedError
418
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
419
            if not allowed:
420
                raise NotAllowedError
421
        path, node = self._lookup_container(account, container)
422
        before = until if until is not None else inf
423
        return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
424
    
425
    @backend_method
426
    def get_object_meta(self, user, account, container, name, version=None):
427
        """Return a dictionary with the object metadata."""
428
        
429
        logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
430
        self._can_read(user, account, container, name)
431
        path, node = self._lookup_object(account, container, name)
432
        props = self._get_version(node, version)
433
        if version is None:
434
            modified = props[self.MTIME]
435
        else:
436
            try:
437
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
438
            except NameError: # Object may be deleted.
439
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
440
                if del_props is None:
441
                    raise NameError('Object does not exist')
442
                modified = del_props[self.MTIME]
443
        
444
        meta = dict(self.node.attribute_get(props[self.SERIAL]))
445
        meta.update({'name': name, 'bytes': props[self.SIZE]})
446
        meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
447
        meta.update({'modified': modified, 'modified_by': props[self.MUSER]})
448
        return meta
449
    
450
    @backend_method
451
    def update_object_meta(self, user, account, container, name, meta, replace=False):
452
        """Update the metadata associated with the object."""
453
        
454
        logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
455
        self._can_write(user, account, container, name)
456
        path, node = self._lookup_object(account, container, name)
457
        return self._put_metadata(user, node, meta, replace)
458
    
459
    @backend_method
460
    def get_object_permissions(self, user, account, container, name):
461
        """Return the action allowed on the object, the path
462
        from which the object gets its permissions from,
463
        along with a dictionary containing the permissions."""
464
        
465
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
466
        allowed = 'write'
467
        if user != account:
468
            path = '/'.join((account, container, name))
469
            if self.permissions.access_check(path, self.WRITE, user):
470
                allowed = 'write'
471
            elif self.permissions.access_check(path, self.READ, user):
472
                allowed = 'read'
473
            else:
474
                raise NotAllowedError
475
        path = self._lookup_object(account, container, name)[0]
476
        return (allowed,) + self.permissions.access_inherit(path)
477
    
478
    @backend_method
479
    def update_object_permissions(self, user, account, container, name, permissions):
480
        """Update the permissions associated with the object."""
481
        
482
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
483
        if user != account:
484
            raise NotAllowedError
485
        path = self._lookup_object(account, container, name)[0]
486
        self._check_permissions(path, permissions)
487
        self.permissions.access_set(path, permissions)
488
    
489
    @backend_method
490
    def get_object_public(self, user, account, container, name):
491
        """Return the public URL of the object if applicable."""
492
        
493
        logger.debug("get_object_public: %s %s %s", account, container, name)
494
        self._can_read(user, account, container, name)
495
        path = self._lookup_object(account, container, name)[0]
496
        if self.permissions.public_check(path):
497
            return '/public/' + path
498
        return None
499
    
500
    @backend_method
501
    def update_object_public(self, user, account, container, name, public):
502
        """Update the public status of the object."""
503
        
504
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
505
        self._can_write(user, account, container, name)
506
        path = self._lookup_object(account, container, name)[0]
507
        if not public:
508
            self.permissions.public_unset(path)
509
        else:
510
            self.permissions.public_set(path)
511
    
512
    @backend_method
513
    def get_object_hashmap(self, user, account, container, name, version=None):
514
        """Return the object's size and a list with partial hashes."""
515
        
516
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
517
        self._can_read(user, account, container, name)
518
        path, node = self._lookup_object(account, container, name)
519
        props = self._get_version(node, version)
520
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
521
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
522
    
523
    def _update_object_hash(self, user, account, container, name, size, hash, meta={}, replace_meta=False, permissions=None):
524
        if permissions is not None and user != account:
525
            raise NotAllowedError
526
        self._can_write(user, account, container, name)
527
        if permissions is not None:
528
            path = '/'.join((account, container, name))
529
            self._check_permissions(path, permissions)
530
        
531
        account_path, account_node = self._lookup_account(account, True)
532
        container_path, container_node = self._lookup_container(account, container)
533
        path, node = self._put_object_node(container_path, container_node, name)
534
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size, hash)
535
        
536
        # Check quota.
537
        size_delta = size # Change with versioning.
538
        if size_delta > 0:
539
            account_quota = long(self._get_policy(account_node)['quota'])
540
            container_quota = long(self._get_policy(container_node)['quota'])
541
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
542
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
543
                # This must be executed in a transaction, so the version is never created if it fails.
544
                raise QuotaError
545
        
546
        if not replace_meta and src_version_id is not None:
547
            self.node.attribute_copy(src_version_id, dest_version_id)
548
        self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
549
        if permissions is not None:
550
            self.permissions.access_set(path, permissions)
551
        return dest_version_id
552
    
553
    @backend_method
554
    def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
555
        """Create/update an object with the specified size and partial hashes."""
556
        
557
        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
558
        if size == 0: # No such thing as an empty hashmap.
559
            hashmap = [self.put_block('')]
560
        map = HashMap(self.block_size, self.hash_algorithm)
561
        map.extend([binascii.unhexlify(x) for x in hashmap])
562
        missing = self.store.block_search(map)
563
        if missing:
564
            ie = IndexError()
565
            ie.data = [binascii.hexlify(x) for x in missing]
566
            raise ie
567
        
568
        hash = map.hash()
569
        dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), meta, replace_meta, permissions)
570
        self.store.map_put(hash, map)
571
        return dest_version_id
572
    
573
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
574
        self._can_read(user, src_account, src_container, src_name)
575
        path, node = self._lookup_object(src_account, src_container, src_name)
576
        props = self._get_version(node, src_version)
577
        src_version_id = props[self.SERIAL]
578
        hash = props[self.HASH]
579
        size = props[self.SIZE]
580
        
581
        if replace_meta:
582
            meta = dest_meta
583
        else:
584
            meta = {}
585
        dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, meta, True, permissions)
586
        if not replace_meta:
587
            self.node.attribute_copy(src_version_id, dest_version_id)
588
            self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
589
        return dest_version_id
590
    
591
    @backend_method
592
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
593
        """Copy an object's data and metadata."""
594
        
595
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
596
        return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
597
    
598
    @backend_method
599
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
600
        """Move an object's data and metadata."""
601
        
602
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions)
603
        if user != src_account:
604
            raise NotAllowedError
605
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
606
        self._delete_object(user, src_account, src_container, src_name)
607
        return dest_version_id
608
    
609
    def _delete_object(self, user, account, container, name, until=None):
610
        if user != account:
611
            raise NotAllowedError
612
        
613
        if until is not None:
614
            path = '/'.join((account, container, name))
615
            node = self.node.node_lookup(path)
616
            if node is None:
617
                return
618
            self.node.node_purge(node, until, CLUSTER_NORMAL)
619
            self.node.node_purge(node, until, CLUSTER_HISTORY)
620
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
621
            try:
622
                props = self._get_version(node)
623
            except NameError:
624
                pass
625
            else:
626
                self.permissions.access_clear(path)
627
            return
628
        
629
        path, node = self._lookup_object(account, container, name)
630
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, 0, None, CLUSTER_DELETED)
631
        self.permissions.access_clear(path)
632
    
633
    @backend_method
634
    def delete_object(self, user, account, container, name, until=None):
635
        """Delete/purge an object."""
636
        
637
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
638
        self._delete_object(user, account, container, name, until)
639
    
640
    @backend_method
641
    def list_versions(self, user, account, container, name):
642
        """Return a list of all (version, version_timestamp) tuples for an object."""
643
        
644
        logger.debug("list_versions: %s %s %s", account, container, name)
645
        self._can_read(user, account, container, name)
646
        path, node = self._lookup_object(account, container, name)
647
        versions = self.node.node_get_versions(node)
648
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
649
    
650
    @backend_method(autocommit=0)
651
    def get_block(self, hash):
652
        """Return a block's data."""
653
        
654
        logger.debug("get_block: %s", hash)
655
        block = self.store.block_get(binascii.unhexlify(hash))
656
        if not block:
657
            raise NameError('Block does not exist')
658
        return block
659
    
660
    @backend_method(autocommit=0)
661
    def put_block(self, data):
662
        """Store a block and return the hash."""
663
        
664
        logger.debug("put_block: %s", len(data))
665
        return binascii.hexlify(self.store.block_put(data))
666
    
667
    @backend_method(autocommit=0)
668
    def update_block(self, hash, data, offset=0):
669
        """Update a known block and return the hash."""
670
        
671
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
672
        if offset == 0 and len(data) == self.block_size:
673
            return self.put_block(data)
674
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
675
        return binascii.hexlify(h)
676
    
677
    # Path functions.
678
    
679
    def _put_object_node(self, path, parent, name):
680
        path = '/'.join((path, name))
681
        node = self.node.node_lookup(path)
682
        if node is None:
683
            node = self.node.node_create(parent, path)
684
        return path, node
685
    
686
    def _put_path(self, user, parent, path):
687
        node = self.node.node_create(parent, path)
688
        self.node.version_create(node, None, 0, None, user, CLUSTER_NORMAL)
689
        return node
690
    
691
    def _lookup_account(self, account, create=True):
692
        node = self.node.node_lookup(account)
693
        if node is None and create:
694
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
695
        return account, node
696
    
697
    def _lookup_container(self, account, container):
698
        path = '/'.join((account, container))
699
        node = self.node.node_lookup(path)
700
        if node is None:
701
            raise NameError('Container does not exist')
702
        return path, node
703
    
704
    def _lookup_object(self, account, container, name):
705
        path = '/'.join((account, container, name))
706
        node = self.node.node_lookup(path)
707
        if node is None:
708
            raise NameError('Object does not exist')
709
        return path, node
710
    
711
    def _get_properties(self, node, until=None):
712
        """Return properties until the timestamp given."""
713
        
714
        before = until if until is not None else inf
715
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
716
        if props is None and until is not None:
717
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
718
        if props is None:
719
            raise NameError('Path does not exist')
720
        return props
721
    
722
    def _get_statistics(self, node, until=None):
723
        """Return count, sum of size and latest timestamp of everything under node."""
724
        
725
        if until is None:
726
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
727
        else:
728
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
729
        if stats is None:
730
            stats = (0, 0, 0)
731
        return stats
732
    
733
    def _get_version(self, node, version=None):
734
        if version is None:
735
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
736
            if props is None:
737
                raise NameError('Object does not exist')
738
        else:
739
            try:
740
                version = int(version)
741
            except ValueError:
742
                raise IndexError('Version does not exist')
743
            props = self.node.version_get_properties(version)
744
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
745
                raise IndexError('Version does not exist')
746
        return props
747
    
748
    def _put_version_duplicate(self, user, node, size=None, hash=None, cluster=CLUSTER_NORMAL):
749
        """Create a new version of the node."""
750
        
751
        props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
752
        if props is not None:
753
            src_version_id = props[self.SERIAL]
754
            src_hash = props[self.HASH]
755
            src_size = props[self.SIZE]
756
        else:
757
            src_version_id = None
758
            src_hash = None
759
            src_size = 0
760
        if size is None:
761
            hash = src_hash # This way hash can be set to None.
762
            size = src_size
763
        
764
        if src_version_id is not None:
765
            self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
766
        dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, cluster)
767
        return src_version_id, dest_version_id
768
    
769
    def _put_metadata(self, user, node, meta, replace=False):
770
        """Create a new version and store metadata."""
771
        
772
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
773
        
774
        # TODO: Merge with other functions that update metadata...
775
        if not replace:
776
            if src_version_id is not None:
777
                self.node.attribute_copy(src_version_id, dest_version_id)
778
            self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
779
            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
780
        else:
781
            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
782
        return dest_version_id
783
    
784
    def _list_limits(self, listing, marker, limit):
785
        start = 0
786
        if marker:
787
            try:
788
                start = listing.index(marker) + 1
789
            except ValueError:
790
                pass
791
        if not limit or limit > 10000:
792
            limit = 10000
793
        return start, limit
794
    
795
    def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
796
        cont_prefix = path + '/'
797
        prefix = cont_prefix + prefix
798
        start = cont_prefix + marker if marker else None
799
        before = until if until is not None else inf
800
        filterq = ','.join(keys) if keys else None
801
        
802
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, filterq)
803
        objects.extend([(p, None) for p in prefixes] if virtual else [])
804
        objects.sort(key=lambda x: x[0])
805
        objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
806
        
807
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
808
        return objects[start:start + limit]
809
    
810
    # Policy functions.
811
    
812
    def _check_policy(self, policy):
813
        for k in policy.keys():
814
            if policy[k] == '':
815
                policy[k] = self.default_policy.get(k)
816
        for k, v in policy.iteritems():
817
            if k == 'quota':
818
                q = int(v) # May raise ValueError.
819
                if q < 0:
820
                    raise ValueError
821
            elif k == 'versioning':
822
                if v not in ['auto', 'manual', 'none']:
823
                    raise ValueError
824
            else:
825
                raise ValueError
826
    
827
    def _put_policy(self, node, policy, replace):
828
        if replace:
829
            for k, v in self.default_policy.iteritems():
830
                if k not in policy:
831
                    policy[k] = v
832
        self.node.policy_set(node, policy)
833
    
834
    def _get_policy(self, node):
835
        policy = self.default_policy.copy()
836
        policy.update(self.node.policy_get(node))
837
        return policy
838
    
839
    # Access control functions.
840
    
841
    def _check_groups(self, groups):
842
        # raise ValueError('Bad characters in groups')
843
        pass
844
    
845
    def _check_permissions(self, path, permissions):
846
        # raise ValueError('Bad characters in permissions')
847
        
848
        # Check for existing permissions.
849
        paths = self.permissions.access_list(path)
850
        if paths:
851
            ae = AttributeError()
852
            ae.data = paths
853
            raise ae
854
    
855
    def _can_read(self, user, account, container, name):
856
        if user == account:
857
            return True
858
        path = '/'.join((account, container, name))
859
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
860
            raise NotAllowedError
861
    
862
    def _can_write(self, user, account, container, name):
863
        if user == account:
864
            return True
865
        path = '/'.join((account, container, name))
866
        if not self.permissions.access_check(path, self.WRITE, user):
867
            raise NotAllowedError
868
    
869
    def _allowed_accounts(self, user):
870
        allow = set()
871
        for path in self.permissions.access_list_paths(user):
872
            allow.add(path.split('/', 1)[0])
873
        return sorted(allow)
874
    
875
    def _allowed_containers(self, user, account):
876
        allow = set()
877
        for path in self.permissions.access_list_paths(user, account):
878
            allow.add(path.split('/', 2)[1])
879
        return sorted(allow)